##// END OF EJS Templates
Merge PR #795 (cluster-id and launcher cleanup)...
MinRK -
r4852:3994edb7 merge
parent child Browse files
Show More
@@ -1,327 +1,328 b''
1 1 # encoding: utf-8
2 2 """
3 3 A base class for objects that are configurable.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * Fernando Perez
9 9 * Min RK
10 10 """
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Copyright (C) 2008-2011 The IPython Development Team
14 14 #
15 15 # Distributed under the terms of the BSD License. The full license is in
16 16 # the file COPYING, distributed as part of this software.
17 17 #-----------------------------------------------------------------------------
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 import datetime
24 24 from copy import deepcopy
25 25
26 26 from loader import Config
27 27 from IPython.utils.traitlets import HasTraits, Instance
28 28 from IPython.utils.text import indent, wrap_paragraphs
29 29
30 30
31 31 #-----------------------------------------------------------------------------
32 32 # Helper classes for Configurables
33 33 #-----------------------------------------------------------------------------
34 34
35 35
36 36 class ConfigurableError(Exception):
37 37 pass
38 38
39 39
40 40 class MultipleInstanceError(ConfigurableError):
41 41 pass
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Configurable implementation
45 45 #-----------------------------------------------------------------------------
46 46
47 47 class Configurable(HasTraits):
48 48
49 49 config = Instance(Config,(),{})
50 50 created = None
51 51
52 52 def __init__(self, **kwargs):
53 53 """Create a configurable given a config config.
54 54
55 55 Parameters
56 56 ----------
57 57 config : Config
58 58 If this is empty, default values are used. If config is a
59 59 :class:`Config` instance, it will be used to configure the
60 60 instance.
61 61
62 62 Notes
63 63 -----
64 64 Subclasses of Configurable must call the :meth:`__init__` method of
65 65 :class:`Configurable` *before* doing anything else and using
66 66 :func:`super`::
67 67
68 68 class MyConfigurable(Configurable):
69 69 def __init__(self, config=None):
70 70 super(MyConfigurable, self).__init__(config)
71 71 # Then any other code you need to finish initialization.
72 72
73 73 This ensures that instances will be configured properly.
74 74 """
75 75 config = kwargs.pop('config', None)
76 76 if config is not None:
77 77 # We used to deepcopy, but for now we are trying to just save
78 78 # by reference. This *could* have side effects as all components
79 79 # will share config. In fact, I did find such a side effect in
80 80 # _config_changed below. If a config attribute value was a mutable type
81 81 # all instances of a component were getting the same copy, effectively
82 82 # making that a class attribute.
83 83 # self.config = deepcopy(config)
84 84 self.config = config
85 85 # This should go second so individual keyword arguments override
86 86 # the values in config.
87 87 super(Configurable, self).__init__(**kwargs)
88 88 self.created = datetime.datetime.now()
89 89
90 90 #-------------------------------------------------------------------------
91 91 # Static trait notifiations
92 92 #-------------------------------------------------------------------------
93 93
94 94 def _config_changed(self, name, old, new):
95 95 """Update all the class traits having ``config=True`` as metadata.
96 96
97 97 For any class trait with a ``config`` metadata attribute that is
98 98 ``True``, we update the trait with the value of the corresponding
99 99 config entry.
100 100 """
101 101 # Get all traits with a config metadata entry that is True
102 102 traits = self.traits(config=True)
103 103
104 104 # We auto-load config section for this class as well as any parent
105 105 # classes that are Configurable subclasses. This starts with Configurable
106 106 # and works down the mro loading the config for each section.
107 107 section_names = [cls.__name__ for cls in \
108 108 reversed(self.__class__.__mro__) if
109 109 issubclass(cls, Configurable) and issubclass(self.__class__, cls)]
110 110
111 111 for sname in section_names:
112 112 # Don't do a blind getattr as that would cause the config to
113 113 # dynamically create the section with name self.__class__.__name__.
114 114 if new._has_section(sname):
115 115 my_config = new[sname]
116 116 for k, v in traits.iteritems():
117 117 # Don't allow traitlets with config=True to start with
118 118 # uppercase. Otherwise, they are confused with Config
119 119 # subsections. But, developers shouldn't have uppercase
120 120 # attributes anyways! (PEP 6)
121 121 if k[0].upper()==k[0] and not k.startswith('_'):
122 122 raise ConfigurableError('Configurable traitlets with '
123 123 'config=True must start with a lowercase so they are '
124 124 'not confused with Config subsections: %s.%s' % \
125 125 (self.__class__.__name__, k))
126 126 try:
127 127 # Here we grab the value from the config
128 128 # If k has the naming convention of a config
129 129 # section, it will be auto created.
130 130 config_value = my_config[k]
131 131 except KeyError:
132 132 pass
133 133 else:
134 134 # print "Setting %s.%s from %s.%s=%r" % \
135 135 # (self.__class__.__name__,k,sname,k,config_value)
136 136 # We have to do a deepcopy here if we don't deepcopy the entire
137 137 # config object. If we don't, a mutable config_value will be
138 138 # shared by all instances, effectively making it a class attribute.
139 139 setattr(self, k, deepcopy(config_value))
140 140
141 141 @classmethod
142 142 def class_get_help(cls):
143 143 """Get the help string for this class in ReST format."""
144 144 cls_traits = cls.class_traits(config=True)
145 145 final_help = []
146 146 final_help.append(u'%s options' % cls.__name__)
147 147 final_help.append(len(final_help[0])*u'-')
148 148 for k,v in cls.class_traits(config=True).iteritems():
149 149 help = cls.class_get_trait_help(v)
150 150 final_help.append(help)
151 151 return '\n'.join(final_help)
152 152
153 153 @classmethod
154 154 def class_get_trait_help(cls, trait):
155 155 """Get the help string for a single trait."""
156 156 lines = []
157 157 header = "--%s.%s=<%s>" % (cls.__name__, trait.name, trait.__class__.__name__)
158 158 lines.append(header)
159 159 try:
160 160 dvr = repr(trait.get_default_value())
161 161 except Exception:
162 162 dvr = None # ignore defaults we can't construct
163 163 if dvr is not None:
164 164 if len(dvr) > 64:
165 165 dvr = dvr[:61]+'...'
166 166 lines.append(indent('Default: %s'%dvr, 4))
167 167 if 'Enum' in trait.__class__.__name__:
168 168 # include Enum choices
169 169 lines.append(indent('Choices: %r'%(trait.values,)))
170 170
171 171 help = trait.get_metadata('help')
172 172 if help is not None:
173 173 help = '\n'.join(wrap_paragraphs(help, 76))
174 174 lines.append(indent(help, 4))
175 175 return '\n'.join(lines)
176 176
177 177 @classmethod
178 178 def class_print_help(cls):
179 179 """Get the help string for a single trait and print it."""
180 180 print cls.class_get_help()
181 181
182 182 @classmethod
183 183 def class_config_section(cls):
184 184 """Get the config class config section"""
185 185 def c(s):
186 186 """return a commented, wrapped block."""
187 187 s = '\n\n'.join(wrap_paragraphs(s, 78))
188 188
189 189 return '# ' + s.replace('\n', '\n# ')
190 190
191 191 # section header
192 192 breaker = '#' + '-'*78
193 193 s = "# %s configuration"%cls.__name__
194 194 lines = [breaker, s, breaker, '']
195 195 # get the description trait
196 196 desc = cls.class_traits().get('description')
197 197 if desc:
198 198 desc = desc.default_value
199 199 else:
200 200 # no description trait, use __doc__
201 201 desc = getattr(cls, '__doc__', '')
202 202 if desc:
203 203 lines.append(c(desc))
204 204 lines.append('')
205 205
206 206 parents = []
207 207 for parent in cls.mro():
208 208 # only include parents that are not base classes
209 209 # and are not the class itself
210 if issubclass(parent, Configurable) and \
211 not parent in (Configurable, SingletonConfigurable, cls):
210 # and have some configurable traits to inherit
211 if parent is not cls and issubclass(parent, Configurable) and \
212 parent.class_traits(config=True):
212 213 parents.append(parent)
213 214
214 215 if parents:
215 216 pstr = ', '.join([ p.__name__ for p in parents ])
216 217 lines.append(c('%s will inherit config from: %s'%(cls.__name__, pstr)))
217 218 lines.append('')
218 219
219 220 for name,trait in cls.class_traits(config=True).iteritems():
220 221 help = trait.get_metadata('help') or ''
221 222 lines.append(c(help))
222 223 lines.append('# c.%s.%s = %r'%(cls.__name__, name, trait.get_default_value()))
223 224 lines.append('')
224 225 return '\n'.join(lines)
225 226
226 227
227 228
228 229 class SingletonConfigurable(Configurable):
229 230 """A configurable that only allows one instance.
230 231
231 232 This class is for classes that should only have one instance of itself
232 233 or *any* subclass. To create and retrieve such a class use the
233 234 :meth:`SingletonConfigurable.instance` method.
234 235 """
235 236
236 237 _instance = None
237 238
238 239 @classmethod
239 240 def _walk_mro(cls):
240 241 """Walk the cls.mro() for parent classes that are also singletons
241 242
242 243 For use in instance()
243 244 """
244 245
245 246 for subclass in cls.mro():
246 247 if issubclass(cls, subclass) and \
247 248 issubclass(subclass, SingletonConfigurable) and \
248 249 subclass != SingletonConfigurable:
249 250 yield subclass
250 251
251 252 @classmethod
252 253 def clear_instance(cls):
253 254 """unset _instance for this class and singleton parents.
254 255 """
255 256 if not cls.initialized():
256 257 return
257 258 for subclass in cls._walk_mro():
258 259 if isinstance(subclass._instance, cls):
259 260 # only clear instances that are instances
260 261 # of the calling class
261 262 subclass._instance = None
262 263
263 264 @classmethod
264 265 def instance(cls, *args, **kwargs):
265 266 """Returns a global instance of this class.
266 267
267 268 This method create a new instance if none have previously been created
268 269 and returns a previously created instance is one already exists.
269 270
270 271 The arguments and keyword arguments passed to this method are passed
271 272 on to the :meth:`__init__` method of the class upon instantiation.
272 273
273 274 Examples
274 275 --------
275 276
276 277 Create a singleton class using instance, and retrieve it::
277 278
278 279 >>> from IPython.config.configurable import SingletonConfigurable
279 280 >>> class Foo(SingletonConfigurable): pass
280 281 >>> foo = Foo.instance()
281 282 >>> foo == Foo.instance()
282 283 True
283 284
284 285 Create a subclass that is retrived using the base class instance::
285 286
286 287 >>> class Bar(SingletonConfigurable): pass
287 288 >>> class Bam(Bar): pass
288 289 >>> bam = Bam.instance()
289 290 >>> bam == Bar.instance()
290 291 True
291 292 """
292 293 # Create and save the instance
293 294 if cls._instance is None:
294 295 inst = cls(*args, **kwargs)
295 296 # Now make sure that the instance will also be returned by
296 297 # parent classes' _instance attribute.
297 298 for subclass in cls._walk_mro():
298 299 subclass._instance = inst
299 300
300 301 if isinstance(cls._instance, cls):
301 302 return cls._instance
302 303 else:
303 304 raise MultipleInstanceError(
304 305 'Multiple incompatible subclass instances of '
305 306 '%s are being created.' % cls.__name__
306 307 )
307 308
308 309 @classmethod
309 310 def initialized(cls):
310 311 """Has an instance been created?"""
311 312 return hasattr(cls, "_instance") and cls._instance is not None
312 313
313 314
314 315 class LoggingConfigurable(Configurable):
315 316 """A parent class for Configurables that log.
316 317
317 318 Subclasses have a log trait, and the default behavior
318 319 is to get the logger from the currently running Application
319 320 via Application.instance().log.
320 321 """
321 322
322 323 log = Instance('logging.Logger')
323 324 def _log_default(self):
324 325 from IPython.config.application import Application
325 326 return Application.instance().log
326 327
327 328
@@ -1,244 +1,261 b''
1 1 # encoding: utf-8
2 2 """
3 3 The Base Application class for IPython.parallel apps
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * Min RK
9 9
10 10 """
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Copyright (C) 2008-2011 The IPython Development Team
14 14 #
15 15 # Distributed under the terms of the BSD License. The full license is in
16 16 # the file COPYING, distributed as part of this software.
17 17 #-----------------------------------------------------------------------------
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 from __future__ import with_statement
24 24
25 25 import os
26 26 import logging
27 27 import re
28 28 import sys
29 29
30 30 from subprocess import Popen, PIPE
31 31
32 32 from IPython.core import release
33 33 from IPython.core.crashhandler import CrashHandler
34 34 from IPython.core.application import (
35 35 BaseIPythonApplication,
36 36 base_aliases as base_ip_aliases,
37 37 base_flags as base_ip_flags
38 38 )
39 39 from IPython.utils.path import expand_path
40 40
41 41 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Module errors
45 45 #-----------------------------------------------------------------------------
46 46
47 47 class PIDFileError(Exception):
48 48 pass
49 49
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Crash handler for this application
53 53 #-----------------------------------------------------------------------------
54 54
55 55 class ParallelCrashHandler(CrashHandler):
56 56 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
57 57
58 58 def __init__(self, app):
59 59 contact_name = release.authors['Min'][0]
60 60 contact_email = release.authors['Min'][1]
61 61 bug_tracker = 'http://github.com/ipython/ipython/issues'
62 62 super(ParallelCrashHandler,self).__init__(
63 63 app, contact_name, contact_email, bug_tracker
64 64 )
65 65
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # Main application
69 69 #-----------------------------------------------------------------------------
70 70 base_aliases = {}
71 71 base_aliases.update(base_ip_aliases)
72 72 base_aliases.update({
73 73 'profile-dir' : 'ProfileDir.location',
74 74 'work-dir' : 'BaseParallelApplication.work_dir',
75 75 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 76 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 77 'log-url' : 'BaseParallelApplication.log_url',
78 'cluster-id' : 'BaseParallelApplication.cluster_id',
78 79 })
79 80
80 81 base_flags = {
81 82 'log-to-file' : (
82 83 {'BaseParallelApplication' : {'log_to_file' : True}},
83 84 "send log output to a file"
84 85 )
85 86 }
86 87 base_flags.update(base_ip_flags)
87 88
88 89 class BaseParallelApplication(BaseIPythonApplication):
89 90 """The base Application for IPython.parallel apps
90 91
91 92 Principle extensions to BaseIPyythonApplication:
92 93
93 94 * work_dir
94 95 * remote logging via pyzmq
95 96 * IOLoop instance
96 97 """
97 98
98 99 crash_handler_class = ParallelCrashHandler
99 100
100 101 def _log_level_default(self):
101 102 # temporarily override default_log_level to INFO
102 103 return logging.INFO
103 104
104 105 work_dir = Unicode(os.getcwdu(), config=True,
105 106 help='Set the working dir for the process.'
106 107 )
107 108 def _work_dir_changed(self, name, old, new):
108 109 self.work_dir = unicode(expand_path(new))
109 110
110 111 log_to_file = Bool(config=True,
111 112 help="whether to log to a file")
112 113
113 114 clean_logs = Bool(False, config=True,
114 115 help="whether to cleanup old logfiles before starting")
115 116
116 117 log_url = Unicode('', config=True,
117 118 help="The ZMQ URL of the iplogger to aggregate logging.")
118 119
120 cluster_id = Unicode('', config=True,
121 help="""String id to add to runtime files, to prevent name collisions when
122 using multiple clusters with a single profile simultaneously.
123
124 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
125
126 Since this is text inserted into filenames, typical recommendations apply:
127 Simple character strings are ideal, and spaces are not recommended (but should
128 generally work).
129 """
130 )
131 def _cluster_id_changed(self, name, old, new):
132 self.name = self.__class__.name
133 if new:
134 self.name += '-%s'%new
135
119 136 def _config_files_default(self):
120 137 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
121 138
122 139 loop = Instance('zmq.eventloop.ioloop.IOLoop')
123 140 def _loop_default(self):
124 141 from zmq.eventloop.ioloop import IOLoop
125 142 return IOLoop.instance()
126 143
127 144 aliases = Dict(base_aliases)
128 145 flags = Dict(base_flags)
129 146
130 147 def initialize(self, argv=None):
131 148 """initialize the app"""
132 149 super(BaseParallelApplication, self).initialize(argv)
133 150 self.to_work_dir()
134 151 self.reinit_logging()
135 152
136 153 def to_work_dir(self):
137 154 wd = self.work_dir
138 155 if unicode(wd) != os.getcwdu():
139 156 os.chdir(wd)
140 157 self.log.info("Changing to working dir: %s" % wd)
141 158 # This is the working dir by now.
142 159 sys.path.insert(0, '')
143 160
144 161 def reinit_logging(self):
145 162 # Remove old log files
146 163 log_dir = self.profile_dir.log_dir
147 164 if self.clean_logs:
148 165 for f in os.listdir(log_dir):
149 166 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
150 167 os.remove(os.path.join(log_dir, f))
151 168 if self.log_to_file:
152 169 # Start logging to the new log file
153 170 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
154 171 logfile = os.path.join(log_dir, log_filename)
155 172 open_log_file = open(logfile, 'w')
156 173 else:
157 174 open_log_file = None
158 175 if open_log_file is not None:
159 176 self.log.removeHandler(self._log_handler)
160 177 self._log_handler = logging.StreamHandler(open_log_file)
161 178 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
162 179 self._log_handler.setFormatter(self._log_formatter)
163 180 self.log.addHandler(self._log_handler)
164 181 # do not propagate log messages to root logger
165 182 # ipcluster app will sometimes print duplicate messages during shutdown
166 183 # if this is 1 (default):
167 184 self.log.propagate = False
168 185
169 186 def write_pid_file(self, overwrite=False):
170 187 """Create a .pid file in the pid_dir with my pid.
171 188
172 189 This must be called after pre_construct, which sets `self.pid_dir`.
173 190 This raises :exc:`PIDFileError` if the pid file exists already.
174 191 """
175 192 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
176 193 if os.path.isfile(pid_file):
177 194 pid = self.get_pid_from_file()
178 195 if not overwrite:
179 196 raise PIDFileError(
180 197 'The pid file [%s] already exists. \nThis could mean that this '
181 198 'server is already running with [pid=%s].' % (pid_file, pid)
182 199 )
183 200 with open(pid_file, 'w') as f:
184 201 self.log.info("Creating pid file: %s" % pid_file)
185 202 f.write(repr(os.getpid())+'\n')
186 203
187 204 def remove_pid_file(self):
188 205 """Remove the pid file.
189 206
190 207 This should be called at shutdown by registering a callback with
191 208 :func:`reactor.addSystemEventTrigger`. This needs to return
192 209 ``None``.
193 210 """
194 211 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
195 212 if os.path.isfile(pid_file):
196 213 try:
197 214 self.log.info("Removing pid file: %s" % pid_file)
198 215 os.remove(pid_file)
199 216 except:
200 217 self.log.warn("Error removing the pid file: %s" % pid_file)
201 218
202 219 def get_pid_from_file(self):
203 220 """Get the pid from the pid file.
204 221
205 222 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
206 223 """
207 224 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
208 225 if os.path.isfile(pid_file):
209 226 with open(pid_file, 'r') as f:
210 227 s = f.read().strip()
211 228 try:
212 229 pid = int(s)
213 230 except:
214 231 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
215 232 return pid
216 233 else:
217 234 raise PIDFileError('pid file not found: %s' % pid_file)
218 235
219 236 def check_pid(self, pid):
220 237 if os.name == 'nt':
221 238 try:
222 239 import ctypes
223 240 # returns 0 if no such process (of ours) exists
224 241 # positive int otherwise
225 242 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
226 243 except Exception:
227 244 self.log.warn(
228 245 "Could not determine whether pid %i is running via `OpenProcess`. "
229 246 " Making the likely assumption that it is."%pid
230 247 )
231 248 return True
232 249 return bool(p)
233 250 else:
234 251 try:
235 252 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
236 253 output,_ = p.communicate()
237 254 except OSError:
238 255 self.log.warn(
239 256 "Could not determine whether pid %i is running via `ps x`. "
240 257 " Making the likely assumption that it is."%pid
241 258 )
242 259 return True
243 260 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
244 261 return pid in pids
@@ -1,529 +1,528 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import errno
25 25 import logging
26 26 import os
27 27 import re
28 28 import signal
29 29
30 30 from subprocess import check_call, CalledProcessError, PIPE
31 31 import zmq
32 32 from zmq.eventloop import ioloop
33 33
34 34 from IPython.config.application import Application, boolean_flag
35 35 from IPython.config.loader import Config
36 36 from IPython.core.application import BaseIPythonApplication
37 37 from IPython.core.profiledir import ProfileDir
38 38 from IPython.utils.daemonize import daemonize
39 39 from IPython.utils.importstring import import_item
40 40 from IPython.utils.sysinfo import num_cpus
41 41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
42 42 DottedObjectName)
43 43
44 44 from IPython.parallel.apps.baseapp import (
45 45 BaseParallelApplication,
46 46 PIDFileError,
47 47 base_flags, base_aliases
48 48 )
49 49
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Module level variables
53 53 #-----------------------------------------------------------------------------
54 54
55 55
56 56 default_config_file_name = u'ipcluster_config.py'
57 57
58 58
59 59 _description = """Start an IPython cluster for parallel computing.
60 60
61 61 An IPython cluster consists of 1 controller and 1 or more engines.
62 62 This command automates the startup of these processes using a wide
63 63 range of startup methods (SSH, local processes, PBS, mpiexec,
64 64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 68 """
69 69
70 70 _main_examples = """
71 71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 72 ipcluster start -h # show the help string for the start subcmd
73 73
74 74 ipcluster stop -h # show the help string for the stop subcmd
75 75 ipcluster engines -h # show the help string for the engines subcmd
76 76 """
77 77
78 78 _start_examples = """
79 79 ipython profile create mycluster --parallel # create mycluster profile
80 80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 81 """
82 82
83 83 _stop_examples = """
84 84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 85 """
86 86
87 87 _engines_examples = """
88 88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 89 """
90 90
91 91
92 92 # Exit codes for ipcluster
93 93
94 94 # This will be the exit code if the ipcluster appears to be running because
95 95 # a .pid file exists
96 96 ALREADY_STARTED = 10
97 97
98 98
99 99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 100 # file to be found.
101 101 ALREADY_STOPPED = 11
102 102
103 103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 104 # file to be found.
105 105 NO_CLUSTER = 12
106 106
107 107
108 108 #-----------------------------------------------------------------------------
109 109 # Main application
110 110 #-----------------------------------------------------------------------------
111 111 start_help = """Start an IPython cluster for parallel computing
112 112
113 113 Start an ipython cluster by its profile name or cluster
114 114 directory. Cluster directories contain configuration, log and
115 115 security related files and are named using the convention
116 116 'profile_<name>' and should be creating using the 'start'
117 117 subcommand of 'ipcluster'. If your cluster directory is in
118 118 the cwd or the ipython directory, you can simply refer to it
119 119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
120 120 otherwise use the 'profile-dir' option.
121 121 """
122 122 stop_help = """Stop a running IPython cluster
123 123
124 124 Stop a running ipython cluster by its profile name or cluster
125 125 directory. Cluster directories are named using the convention
126 126 'profile_<name>'. If your cluster directory is in
127 127 the cwd or the ipython directory, you can simply refer to it
128 128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
129 129 use the '--profile-dir' option.
130 130 """
131 131 engines_help = """Start engines connected to an existing IPython cluster
132 132
133 133 Start one or more engines to connect to an existing Cluster
134 134 by profile name or cluster directory.
135 135 Cluster directories contain configuration, log and
136 136 security related files and are named using the convention
137 137 'profile_<name>' and should be creating using the 'start'
138 138 subcommand of 'ipcluster'. If your cluster directory is in
139 139 the cwd or the ipython directory, you can simply refer to it
140 140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
141 141 otherwise use the 'profile-dir' option.
142 142 """
143 143 stop_aliases = dict(
144 144 signal='IPClusterStop.signal',
145 145 )
146 146 stop_aliases.update(base_aliases)
147 147
148 148 class IPClusterStop(BaseParallelApplication):
149 149 name = u'ipcluster'
150 150 description = stop_help
151 151 examples = _stop_examples
152 152 config_file_name = Unicode(default_config_file_name)
153 153
154 154 signal = Int(signal.SIGINT, config=True,
155 155 help="signal to use for stopping processes.")
156 156
157 157 aliases = Dict(stop_aliases)
158 158
159 159 def start(self):
160 160 """Start the app for the stop subcommand."""
161 161 try:
162 162 pid = self.get_pid_from_file()
163 163 except PIDFileError:
164 164 self.log.critical(
165 165 'Could not read pid file, cluster is probably not running.'
166 166 )
167 167 # Here I exit with a unusual exit status that other processes
168 168 # can watch for to learn how I existed.
169 169 self.remove_pid_file()
170 170 self.exit(ALREADY_STOPPED)
171 171
172 172 if not self.check_pid(pid):
173 173 self.log.critical(
174 174 'Cluster [pid=%r] is not running.' % pid
175 175 )
176 176 self.remove_pid_file()
177 177 # Here I exit with a unusual exit status that other processes
178 178 # can watch for to learn how I existed.
179 179 self.exit(ALREADY_STOPPED)
180 180
181 181 elif os.name=='posix':
182 182 sig = self.signal
183 183 self.log.info(
184 184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
185 185 )
186 186 try:
187 187 os.kill(pid, sig)
188 188 except OSError:
189 189 self.log.error("Stopping cluster failed, assuming already dead.",
190 190 exc_info=True)
191 191 self.remove_pid_file()
192 192 elif os.name=='nt':
193 193 try:
194 194 # kill the whole tree
195 195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
196 196 except (CalledProcessError, OSError):
197 197 self.log.error("Stopping cluster failed, assuming already dead.",
198 198 exc_info=True)
199 199 self.remove_pid_file()
200 200
201 201 engine_aliases = {}
202 202 engine_aliases.update(base_aliases)
203 203 engine_aliases.update(dict(
204 204 n='IPClusterEngines.n',
205 205 engines = 'IPClusterEngines.engine_launcher_class',
206 206 daemonize = 'IPClusterEngines.daemonize',
207 207 ))
208 208 engine_flags = {}
209 209 engine_flags.update(base_flags)
210 210
211 211 engine_flags.update(dict(
212 212 daemonize=(
213 213 {'IPClusterEngines' : {'daemonize' : True}},
214 214 """run the cluster into the background (not available on Windows)""",
215 215 )
216 216 ))
217 217 class IPClusterEngines(BaseParallelApplication):
218 218
219 219 name = u'ipcluster'
220 220 description = engines_help
221 221 examples = _engines_examples
222 222 usage = None
223 223 config_file_name = Unicode(default_config_file_name)
224 224 default_log_level = logging.INFO
225 225 classes = List()
226 226 def _classes_default(self):
227 227 from IPython.parallel.apps import launcher
228 228 launchers = launcher.all_launchers
229 229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
230 230 return [ProfileDir]+eslaunchers
231 231
232 232 n = Int(num_cpus(), config=True,
233 233 help="""The number of engines to start. The default is to use one for each
234 234 CPU on your machine""")
235 235
236 236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
237 237 config=True,
238 238 help="""The class for launching a set of Engines. Change this value
239 239 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
240 240 Each launcher class has its own set of configuration options, for making sure
241 241 it will work in your environment.
242 242
243 243 You can also write your own launcher, and specify it's absolute import path,
244 244 as in 'mymodule.launcher.FTLEnginesLauncher`.
245 245
246 246 Examples include:
247 247
248 248 LocalEngineSetLauncher : start engines locally as subprocesses [default]
249 249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
250 250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
251 251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
252 252 SSHEngineSetLauncher : use SSH to start the controller
253 253 Note that SSH does *not* move the connection files
254 254 around, so you will likely have to do this manually
255 255 unless the machines are on a shared file system.
256 256 WindowsHPCEngineSetLauncher : use Windows HPC
257 257 """
258 258 )
259 259 daemonize = Bool(False, config=True,
260 260 help="""Daemonize the ipcluster program. This implies --log-to-file.
261 261 Not available on Windows.
262 262 """)
263 263
264 264 def _daemonize_changed(self, name, old, new):
265 265 if new:
266 266 self.log_to_file = True
267 267
268 268 aliases = Dict(engine_aliases)
269 269 flags = Dict(engine_flags)
270 270 _stopping = False
271 271
272 272 def initialize(self, argv=None):
273 273 super(IPClusterEngines, self).initialize(argv)
274 274 self.init_signal()
275 275 self.init_launchers()
276 276
277 277 def init_launchers(self):
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
279 279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
280 280
281 281 def init_signal(self):
282 282 # Setup signals
283 283 signal.signal(signal.SIGINT, self.sigint_handler)
284 284
285 def build_launcher(self, clsname):
285 def build_launcher(self, clsname, kind=None):
286 286 """import and instantiate a Launcher based on importstring"""
287 287 if '.' not in clsname:
288 288 # not a module, presume it's the raw name in apps.launcher
289 if kind and kind not in clsname:
290 # doesn't match necessary full class name, assume it's
291 # just 'PBS' or 'MPIExec' prefix:
292 clsname = clsname + kind + 'Launcher'
289 293 clsname = 'IPython.parallel.apps.launcher.'+clsname
290 # print repr(clsname)
291 294 try:
292 295 klass = import_item(clsname)
293 296 except (ImportError, KeyError):
294 297 self.log.fatal("Could not import launcher class: %r"%clsname)
295 298 self.exit(1)
296 299
297 300 launcher = klass(
298 work_dir=u'.', config=self.config, log=self.log
301 work_dir=u'.', config=self.config, log=self.log,
302 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
299 303 )
300 304 return launcher
301 305
302 306 def start_engines(self):
303 307 self.log.info("Starting %i engines"%self.n)
304 self.engine_launcher.start(
305 self.n,
306 self.profile_dir.location
307 )
308 self.engine_launcher.start(self.n)
308 309
309 310 def stop_engines(self):
310 311 self.log.info("Stopping Engines...")
311 312 if self.engine_launcher.running:
312 313 d = self.engine_launcher.stop()
313 314 return d
314 315 else:
315 316 return None
316 317
317 318 def stop_launchers(self, r=None):
318 319 if not self._stopping:
319 320 self._stopping = True
320 321 self.log.error("IPython cluster: stopping")
321 322 self.stop_engines()
322 323 # Wait a few seconds to let things shut down.
323 324 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
324 325 dc.start()
325 326
326 327 def sigint_handler(self, signum, frame):
327 328 self.log.debug("SIGINT received, stopping launchers...")
328 329 self.stop_launchers()
329 330
330 331 def start_logging(self):
331 332 # Remove old log files of the controller and engine
332 333 if self.clean_logs:
333 334 log_dir = self.profile_dir.log_dir
334 335 for f in os.listdir(log_dir):
335 336 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
336 337 os.remove(os.path.join(log_dir, f))
337 338 # This will remove old log files for ipcluster itself
338 339 # super(IPBaseParallelApplication, self).start_logging()
339 340
340 341 def start(self):
341 342 """Start the app for the engines subcommand."""
342 343 self.log.info("IPython cluster: started")
343 344 # First see if the cluster is already running
344 345
345 346 # Now log and daemonize
346 347 self.log.info(
347 348 'Starting engines with [daemon=%r]' % self.daemonize
348 349 )
349 350 # TODO: Get daemonize working on Windows or as a Windows Server.
350 351 if self.daemonize:
351 352 if os.name=='posix':
352 353 daemonize()
353 354
354 355 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
355 356 dc.start()
356 357 # Now write the new pid file AFTER our new forked pid is active.
357 358 # self.write_pid_file()
358 359 try:
359 360 self.loop.start()
360 361 except KeyboardInterrupt:
361 362 pass
362 363 except zmq.ZMQError as e:
363 364 if e.errno == errno.EINTR:
364 365 pass
365 366 else:
366 367 raise
367 368
368 369 start_aliases = {}
369 370 start_aliases.update(engine_aliases)
370 371 start_aliases.update(dict(
371 372 delay='IPClusterStart.delay',
372 373 controller = 'IPClusterStart.controller_launcher_class',
373 374 ))
374 375 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
375 376
376 377 # set inherited Start keys directly, to ensure command-line args get higher priority
377 378 # than config file options.
378 379 for key,value in start_aliases.items():
379 380 if value.startswith('IPClusterEngines'):
380 381 start_aliases[key] = value.replace('IPClusterEngines', 'IPClusterStart')
381 382
382 383 class IPClusterStart(IPClusterEngines):
383 384
384 385 name = u'ipcluster'
385 386 description = start_help
386 387 examples = _start_examples
387 388 default_log_level = logging.INFO
388 389 auto_create = Bool(True, config=True,
389 390 help="whether to create the profile_dir if it doesn't exist")
390 391 classes = List()
391 392 def _classes_default(self,):
392 393 from IPython.parallel.apps import launcher
393 394 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
394 395
395 396 clean_logs = Bool(True, config=True,
396 397 help="whether to cleanup old logs before starting")
397 398
398 399 delay = CFloat(1., config=True,
399 400 help="delay (in s) between starting the controller and the engines")
400 401
401 402 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
402 403 config=True,
403 404 helep="""The class for launching a Controller. Change this value if you want
404 405 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
405 406
406 407 Each launcher class has its own set of configuration options, for making sure
407 408 it will work in your environment.
408 409
409 410 Examples include:
410 411
411 412 LocalControllerLauncher : start engines locally as subprocesses
412 413 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
413 414 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
414 415 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
415 416 SSHControllerLauncher : use SSH to start the controller
416 417 WindowsHPCControllerLauncher : use Windows HPC
417 418 """
418 419 )
419 420 reset = Bool(False, config=True,
420 421 help="Whether to reset config files as part of '--create'."
421 422 )
422 423
423 424 # flags = Dict(flags)
424 425 aliases = Dict(start_aliases)
425 426
426 427 def init_launchers(self):
427 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
428 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
428 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
429 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
429 430 self.controller_launcher.on_stop(self.stop_launchers)
430 431
431 432 def start_controller(self):
432 self.controller_launcher.start(
433 self.profile_dir.location
434 )
433 self.controller_launcher.start()
435 434
436 435 def stop_controller(self):
437 436 # self.log.info("In stop_controller")
438 437 if self.controller_launcher and self.controller_launcher.running:
439 438 return self.controller_launcher.stop()
440 439
441 440 def stop_launchers(self, r=None):
442 441 if not self._stopping:
443 442 self.stop_controller()
444 443 super(IPClusterStart, self).stop_launchers()
445 444
446 445 def start(self):
447 446 """Start the app for the start subcommand."""
448 447 # First see if the cluster is already running
449 448 try:
450 449 pid = self.get_pid_from_file()
451 450 except PIDFileError:
452 451 pass
453 452 else:
454 453 if self.check_pid(pid):
455 454 self.log.critical(
456 455 'Cluster is already running with [pid=%s]. '
457 456 'use "ipcluster stop" to stop the cluster.' % pid
458 457 )
459 458 # Here I exit with a unusual exit status that other processes
460 459 # can watch for to learn how I existed.
461 460 self.exit(ALREADY_STARTED)
462 461 else:
463 462 self.remove_pid_file()
464 463
465 464
466 465 # Now log and daemonize
467 466 self.log.info(
468 467 'Starting ipcluster with [daemon=%r]' % self.daemonize
469 468 )
470 469 # TODO: Get daemonize working on Windows or as a Windows Server.
471 470 if self.daemonize:
472 471 if os.name=='posix':
473 472 daemonize()
474 473
475 474 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
476 475 dc.start()
477 476 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
478 477 dc.start()
479 478 # Now write the new pid file AFTER our new forked pid is active.
480 479 self.write_pid_file()
481 480 try:
482 481 self.loop.start()
483 482 except KeyboardInterrupt:
484 483 pass
485 484 except zmq.ZMQError as e:
486 485 if e.errno == errno.EINTR:
487 486 pass
488 487 else:
489 488 raise
490 489 finally:
491 490 self.remove_pid_file()
492 491
493 492 base='IPython.parallel.apps.ipclusterapp.IPCluster'
494 493
495 494 class IPClusterApp(Application):
496 495 name = u'ipcluster'
497 496 description = _description
498 497 examples = _main_examples
499 498
500 499 subcommands = {
501 500 'start' : (base+'Start', start_help),
502 501 'stop' : (base+'Stop', stop_help),
503 502 'engines' : (base+'Engines', engines_help),
504 503 }
505 504
506 505 # no aliases or flags for parent App
507 506 aliases = Dict()
508 507 flags = Dict()
509 508
510 509 def start(self):
511 510 if self.subapp is None:
512 511 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
513 512 print
514 513 self.print_description()
515 514 self.print_subcommands()
516 515 self.exit(1)
517 516 else:
518 517 return self.subapp.start()
519 518
520 519 def launch_new_instance():
521 520 """Create and run the IPython cluster."""
522 521 app = IPClusterApp.instance()
523 522 app.initialize()
524 523 app.start()
525 524
526 525
527 526 if __name__ == '__main__':
528 527 launch_new_instance()
529 528
@@ -1,441 +1,452 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import os
27 27 import socket
28 28 import stat
29 29 import sys
30 30 import uuid
31 31
32 32 from multiprocessing import Process
33 33
34 34 import zmq
35 35 from zmq.devices import ProcessMonitoredQueue
36 36 from zmq.log.handlers import PUBHandler
37 37 from zmq.utils import jsonapi as json
38 38
39 39 from IPython.config.application import boolean_flag
40 40 from IPython.core.profiledir import ProfileDir
41 41
42 42 from IPython.parallel.apps.baseapp import (
43 43 BaseParallelApplication,
44 44 base_aliases,
45 45 base_flags,
46 46 )
47 47 from IPython.utils.importstring import import_item
48 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
49 49
50 50 # from IPython.parallel.controller.controller import ControllerFactory
51 51 from IPython.zmq.session import Session
52 52 from IPython.parallel.controller.heartmonitor import HeartMonitor
53 53 from IPython.parallel.controller.hub import HubFactory
54 54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56 56
57 57 from IPython.parallel.util import signal_children, split_url, asbytes
58 58
59 59 # conditional import of MongoDB backend class
60 60
61 61 try:
62 62 from IPython.parallel.controller.mongodb import MongoDB
63 63 except ImportError:
64 64 maybe_mongo = []
65 65 else:
66 66 maybe_mongo = [MongoDB]
67 67
68 68
69 69 #-----------------------------------------------------------------------------
70 70 # Module level variables
71 71 #-----------------------------------------------------------------------------
72 72
73 73
74 74 #: The default config file name for this application
75 75 default_config_file_name = u'ipcontroller_config.py'
76 76
77 77
78 78 _description = """Start the IPython controller for parallel computing.
79 79
80 80 The IPython controller provides a gateway between the IPython engines and
81 81 clients. The controller needs to be started before the engines and can be
82 82 configured using command line options or using a cluster directory. Cluster
83 83 directories contain config, log and security files and are usually located in
84 84 your ipython directory and named as "profile_name". See the `profile`
85 85 and `profile-dir` options for details.
86 86 """
87 87
88 88 _examples = """
89 89 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
90 90 ipcontroller --scheme=pure # use the pure zeromq scheduler
91 91 """
92 92
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # The main application
96 96 #-----------------------------------------------------------------------------
97 97 flags = {}
98 98 flags.update(base_flags)
99 99 flags.update({
100 100 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
101 101 'Use threads instead of processes for the schedulers'),
102 102 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
103 103 'use the SQLiteDB backend'),
104 104 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
105 105 'use the MongoDB backend'),
106 106 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
107 107 'use the in-memory DictDB backend'),
108 108 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
109 109 'reuse existing json connection files')
110 110 })
111 111
112 112 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
113 113 "Use HMAC digests for authentication of messages.",
114 114 "Don't authenticate messages."
115 115 ))
116 116 aliases = dict(
117 117 secure = 'IPControllerApp.secure',
118 118 ssh = 'IPControllerApp.ssh_server',
119 119 enginessh = 'IPControllerApp.engine_ssh_server',
120 120 location = 'IPControllerApp.location',
121 121
122 122 ident = 'Session.session',
123 123 user = 'Session.username',
124 124 keyfile = 'Session.keyfile',
125 125
126 126 url = 'HubFactory.url',
127 127 ip = 'HubFactory.ip',
128 128 transport = 'HubFactory.transport',
129 129 port = 'HubFactory.regport',
130 130
131 131 ping = 'HeartMonitor.period',
132 132
133 133 scheme = 'TaskScheduler.scheme_name',
134 134 hwm = 'TaskScheduler.hwm',
135 135 )
136 136 aliases.update(base_aliases)
137 137
138 138
139 139 class IPControllerApp(BaseParallelApplication):
140 140
141 141 name = u'ipcontroller'
142 142 description = _description
143 143 examples = _examples
144 144 config_file_name = Unicode(default_config_file_name)
145 145 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
146 146
147 147 # change default to True
148 148 auto_create = Bool(True, config=True,
149 149 help="""Whether to create profile dir if it doesn't exist.""")
150 150
151 151 reuse_files = Bool(False, config=True,
152 152 help='Whether to reuse existing json connection files.'
153 153 )
154 154 secure = Bool(True, config=True,
155 155 help='Whether to use HMAC digests for extra message authentication.'
156 156 )
157 157 ssh_server = Unicode(u'', config=True,
158 158 help="""ssh url for clients to use when connecting to the Controller
159 159 processes. It should be of the form: [user@]server[:port]. The
160 160 Controller's listening addresses must be accessible from the ssh server""",
161 161 )
162 162 engine_ssh_server = Unicode(u'', config=True,
163 163 help="""ssh url for engines to use when connecting to the Controller
164 164 processes. It should be of the form: [user@]server[:port]. The
165 165 Controller's listening addresses must be accessible from the ssh server""",
166 166 )
167 167 location = Unicode(u'', config=True,
168 168 help="""The external IP or domain name of the Controller, used for disambiguating
169 169 engine and client connections.""",
170 170 )
171 171 import_statements = List([], config=True,
172 172 help="import statements to be run at startup. Necessary in some environments"
173 173 )
174 174
175 175 use_threads = Bool(False, config=True,
176 176 help='Use threads instead of processes for the schedulers',
177 )
177 )
178
179 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
180 help="JSON filename where engine connection info will be stored.")
181 client_json_file = Unicode('ipcontroller-client.json', config=True,
182 help="JSON filename where client connection info will be stored.")
183
184 def _cluster_id_changed(self, name, old, new):
185 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
186 self.engine_json_file = "%s-engine.json" % self.name
187 self.client_json_file = "%s-client.json" % self.name
188
178 189
179 190 # internal
180 191 children = List()
181 192 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
182 193
183 194 def _use_threads_changed(self, name, old, new):
184 195 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
185 196
186 197 aliases = Dict(aliases)
187 198 flags = Dict(flags)
188 199
189 200
190 201 def save_connection_dict(self, fname, cdict):
191 202 """save a connection dict to json file."""
192 203 c = self.config
193 204 url = cdict['url']
194 205 location = cdict['location']
195 206 if not location:
196 207 try:
197 208 proto,ip,port = split_url(url)
198 209 except AssertionError:
199 210 pass
200 211 else:
201 212 try:
202 213 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
203 214 except (socket.gaierror, IndexError):
204 215 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
205 216 " You may need to specify '--location=<external_ip_address>' to help"
206 217 " IPython decide when to connect via loopback.")
207 218 location = '127.0.0.1'
208 219 cdict['location'] = location
209 220 fname = os.path.join(self.profile_dir.security_dir, fname)
210 221 with open(fname, 'wb') as f:
211 222 f.write(json.dumps(cdict, indent=2))
212 223 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
213 224
214 225 def load_config_from_json(self):
215 226 """load config from existing json connector files."""
216 227 c = self.config
217 228 # load from engine config
218 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
229 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
219 230 cfg = json.loads(f.read())
220 231 key = c.Session.key = asbytes(cfg['exec_key'])
221 232 xport,addr = cfg['url'].split('://')
222 233 c.HubFactory.engine_transport = xport
223 234 ip,ports = addr.split(':')
224 235 c.HubFactory.engine_ip = ip
225 236 c.HubFactory.regport = int(ports)
226 237 self.location = cfg['location']
227 238 if not self.engine_ssh_server:
228 239 self.engine_ssh_server = cfg['ssh']
229 240 # load client config
230 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
241 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
231 242 cfg = json.loads(f.read())
232 243 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
233 244 xport,addr = cfg['url'].split('://')
234 245 c.HubFactory.client_transport = xport
235 246 ip,ports = addr.split(':')
236 247 c.HubFactory.client_ip = ip
237 248 if not self.ssh_server:
238 249 self.ssh_server = cfg['ssh']
239 250 assert int(ports) == c.HubFactory.regport, "regport mismatch"
240 251
241 252 def init_hub(self):
242 253 c = self.config
243 254
244 255 self.do_import_statements()
245 256 reusing = self.reuse_files
246 257 if reusing:
247 258 try:
248 259 self.load_config_from_json()
249 260 except (AssertionError,IOError):
250 261 reusing=False
251 262 # check again, because reusing may have failed:
252 263 if reusing:
253 264 pass
254 265 elif self.secure:
255 266 key = str(uuid.uuid4())
256 267 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
257 268 # with open(keyfile, 'w') as f:
258 269 # f.write(key)
259 270 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
260 271 c.Session.key = asbytes(key)
261 272 else:
262 273 key = c.Session.key = b''
263 274
264 275 try:
265 276 self.factory = HubFactory(config=c, log=self.log)
266 277 # self.start_logging()
267 278 self.factory.init_hub()
268 279 except:
269 280 self.log.error("Couldn't construct the Controller", exc_info=True)
270 281 self.exit(1)
271 282
272 283 if not reusing:
273 284 # save to new json config files
274 285 f = self.factory
275 286 cdict = {'exec_key' : key,
276 287 'ssh' : self.ssh_server,
277 288 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
278 289 'location' : self.location
279 290 }
280 self.save_connection_dict('ipcontroller-client.json', cdict)
291 self.save_connection_dict(self.client_json_file, cdict)
281 292 edict = cdict
282 293 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
283 294 edict['ssh'] = self.engine_ssh_server
284 self.save_connection_dict('ipcontroller-engine.json', edict)
295 self.save_connection_dict(self.engine_json_file, edict)
285 296
286 297 #
287 298 def init_schedulers(self):
288 299 children = self.children
289 300 mq = import_item(str(self.mq_class))
290 301
291 302 hub = self.factory
292 303 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
293 304 # IOPub relay (in a Process)
294 305 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
295 306 q.bind_in(hub.client_info['iopub'])
296 307 q.bind_out(hub.engine_info['iopub'])
297 308 q.setsockopt_out(zmq.SUBSCRIBE, b'')
298 309 q.connect_mon(hub.monitor_url)
299 310 q.daemon=True
300 311 children.append(q)
301 312
302 313 # Multiplexer Queue (in a Process)
303 314 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
304 315 q.bind_in(hub.client_info['mux'])
305 316 q.setsockopt_in(zmq.IDENTITY, b'mux')
306 317 q.bind_out(hub.engine_info['mux'])
307 318 q.connect_mon(hub.monitor_url)
308 319 q.daemon=True
309 320 children.append(q)
310 321
311 322 # Control Queue (in a Process)
312 323 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
313 324 q.bind_in(hub.client_info['control'])
314 325 q.setsockopt_in(zmq.IDENTITY, b'control')
315 326 q.bind_out(hub.engine_info['control'])
316 327 q.connect_mon(hub.monitor_url)
317 328 q.daemon=True
318 329 children.append(q)
319 330 try:
320 331 scheme = self.config.TaskScheduler.scheme_name
321 332 except AttributeError:
322 333 scheme = TaskScheduler.scheme_name.get_default_value()
323 334 # Task Queue (in a Process)
324 335 if scheme == 'pure':
325 336 self.log.warn("task::using pure XREQ Task scheduler")
326 337 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
327 338 # q.setsockopt_out(zmq.HWM, hub.hwm)
328 339 q.bind_in(hub.client_info['task'][1])
329 340 q.setsockopt_in(zmq.IDENTITY, b'task')
330 341 q.bind_out(hub.engine_info['task'])
331 342 q.connect_mon(hub.monitor_url)
332 343 q.daemon=True
333 344 children.append(q)
334 345 elif scheme == 'none':
335 346 self.log.warn("task::using no Task scheduler")
336 347
337 348 else:
338 349 self.log.info("task::using Python %s Task scheduler"%scheme)
339 350 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
340 351 hub.monitor_url, hub.client_info['notification'])
341 352 kwargs = dict(logname='scheduler', loglevel=self.log_level,
342 353 log_url = self.log_url, config=dict(self.config))
343 354 if 'Process' in self.mq_class:
344 355 # run the Python scheduler in a Process
345 356 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
346 357 q.daemon=True
347 358 children.append(q)
348 359 else:
349 360 # single-threaded Controller
350 361 kwargs['in_thread'] = True
351 362 launch_scheduler(*sargs, **kwargs)
352 363
353 364
354 365 def save_urls(self):
355 366 """save the registration urls to files."""
356 367 c = self.config
357 368
358 369 sec_dir = self.profile_dir.security_dir
359 370 cf = self.factory
360 371
361 372 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
362 373 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
363 374
364 375 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
365 376 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
366 377
367 378
368 379 def do_import_statements(self):
369 380 statements = self.import_statements
370 381 for s in statements:
371 382 try:
372 383 self.log.msg("Executing statement: '%s'" % s)
373 384 exec s in globals(), locals()
374 385 except:
375 386 self.log.msg("Error running statement: %s" % s)
376 387
377 388 def forward_logging(self):
378 389 if self.log_url:
379 390 self.log.info("Forwarding logging to %s"%self.log_url)
380 391 context = zmq.Context.instance()
381 392 lsock = context.socket(zmq.PUB)
382 393 lsock.connect(self.log_url)
383 394 handler = PUBHandler(lsock)
384 395 self.log.removeHandler(self._log_handler)
385 396 handler.root_topic = 'controller'
386 397 handler.setLevel(self.log_level)
387 398 self.log.addHandler(handler)
388 399 self._log_handler = handler
389 400 # #
390 401
391 402 def initialize(self, argv=None):
392 403 super(IPControllerApp, self).initialize(argv)
393 404 self.forward_logging()
394 405 self.init_hub()
395 406 self.init_schedulers()
396 407
397 408 def start(self):
398 409 # Start the subprocesses:
399 410 self.factory.start()
400 411 child_procs = []
401 412 for child in self.children:
402 413 child.start()
403 414 if isinstance(child, ProcessMonitoredQueue):
404 415 child_procs.append(child.launcher)
405 416 elif isinstance(child, Process):
406 417 child_procs.append(child)
407 418 if child_procs:
408 419 signal_children(child_procs)
409 420
410 421 self.write_pid_file(overwrite=True)
411 422
412 423 try:
413 424 self.factory.loop.start()
414 425 except KeyboardInterrupt:
415 426 self.log.critical("Interrupted, Exiting...\n")
416 427
417 428
418 429
419 430 def launch_new_instance():
420 431 """Create and run the IPython controller"""
421 432 if sys.platform == 'win32':
422 433 # make sure we don't get called from a multiprocessing subprocess
423 434 # this can result in infinite Controllers being started on Windows
424 435 # which doesn't have a proper fork, so multiprocessing is wonky
425 436
426 437 # this only comes up when IPython has been installed using vanilla
427 438 # setuptools, and *not* distribute.
428 439 import multiprocessing
429 440 p = multiprocessing.current_process()
430 441 # the main process has name 'MainProcess'
431 442 # subprocesses will have names like 'Process-1'
432 443 if p.name != 'MainProcess':
433 444 # we are a subprocess, don't start another Controller!
434 445 return
435 446 app = IPControllerApp.instance()
436 447 app.initialize()
437 448 app.start()
438 449
439 450
440 451 if __name__ == '__main__':
441 452 launch_new_instance()
@@ -1,336 +1,344 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 )
38 38 from IPython.zmq.log import EnginePUBHandler
39 39
40 40 from IPython.config.configurable import Configurable
41 41 from IPython.zmq.session import Session
42 42 from IPython.parallel.engine.engine import EngineFactory
43 43 from IPython.parallel.engine.streamkernel import Kernel
44 44 from IPython.parallel.util import disambiguate_url, asbytes
45 45
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Module level variables
52 52 #-----------------------------------------------------------------------------
53 53
54 54 #: The default config file name for this application
55 55 default_config_file_name = u'ipengine_config.py'
56 56
57 57 _description = """Start an IPython engine for parallel computing.
58 58
59 59 IPython engines run in parallel and perform computations on behalf of a client
60 60 and controller. A controller needs to be started before the engines. The
61 61 engine can be configured using command line options or using a cluster
62 62 directory. Cluster directories contain config, log and security files and are
63 63 usually located in your ipython directory and named as "profile_name".
64 64 See the `profile` and `profile-dir` options for details.
65 65 """
66 66
67 67 _examples = """
68 68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
69 69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
70 70 """
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # MPI configuration
74 74 #-----------------------------------------------------------------------------
75 75
76 76 mpi4py_init = """from mpi4py import MPI as mpi
77 77 mpi.size = mpi.COMM_WORLD.Get_size()
78 78 mpi.rank = mpi.COMM_WORLD.Get_rank()
79 79 """
80 80
81 81
82 82 pytrilinos_init = """from PyTrilinos import Epetra
83 83 class SimpleStruct:
84 84 pass
85 85 mpi = SimpleStruct()
86 86 mpi.rank = 0
87 87 mpi.size = 0
88 88 """
89 89
90 90 class MPI(Configurable):
91 91 """Configurable for MPI initialization"""
92 92 use = Unicode('', config=True,
93 93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
94 94 )
95 95
96 def _on_use_changed(self, old, new):
96 def _use_changed(self, name, old, new):
97 97 # load default init script if it's not set
98 98 if not self.init_script:
99 99 self.init_script = self.default_inits.get(new, '')
100 100
101 101 init_script = Unicode('', config=True,
102 102 help="Initialization code for MPI")
103 103
104 104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
105 105 config=True)
106 106
107 107
108 108 #-----------------------------------------------------------------------------
109 109 # Main application
110 110 #-----------------------------------------------------------------------------
111 111 aliases = dict(
112 112 file = 'IPEngineApp.url_file',
113 113 c = 'IPEngineApp.startup_command',
114 114 s = 'IPEngineApp.startup_script',
115 115
116 116 ident = 'Session.session',
117 117 user = 'Session.username',
118 118 keyfile = 'Session.keyfile',
119 119
120 120 url = 'EngineFactory.url',
121 121 ssh = 'EngineFactory.sshserver',
122 122 sshkey = 'EngineFactory.sshkey',
123 123 ip = 'EngineFactory.ip',
124 124 transport = 'EngineFactory.transport',
125 125 port = 'EngineFactory.regport',
126 126 location = 'EngineFactory.location',
127 127
128 128 timeout = 'EngineFactory.timeout',
129 129
130 130 mpi = 'MPI.use',
131 131
132 132 )
133 133 aliases.update(base_aliases)
134 134
135 135
136 136 class IPEngineApp(BaseParallelApplication):
137 137
138 name = Unicode(u'ipengine')
139 description = Unicode(_description)
138 name = 'ipengine'
139 description = _description
140 140 examples = _examples
141 141 config_file_name = Unicode(default_config_file_name)
142 142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
143 143
144 144 startup_script = Unicode(u'', config=True,
145 145 help='specify a script to be run at startup')
146 146 startup_command = Unicode('', config=True,
147 147 help='specify a command to be run at startup')
148 148
149 149 url_file = Unicode(u'', config=True,
150 150 help="""The full location of the file containing the connection information for
151 151 the controller. If this is not given, the file must be in the
152 152 security directory of the cluster directory. This location is
153 153 resolved using the `profile` or `profile_dir` options.""",
154 154 )
155 155 wait_for_url_file = Float(5, config=True,
156 156 help="""The maximum number of seconds to wait for url_file to exist.
157 157 This is useful for batch-systems and shared-filesystems where the
158 158 controller and engine are started at the same time and it
159 159 may take a moment for the controller to write the connector files.""")
160 160
161 url_file_name = Unicode(u'ipcontroller-engine.json')
161 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
162
163 def _cluster_id_changed(self, name, old, new):
164 if new:
165 base = 'ipcontroller-%s' % new
166 else:
167 base = 'ipcontroller'
168 self.url_file_name = "%s-engine.json" % base
169
162 170 log_url = Unicode('', config=True,
163 171 help="""The URL for the iploggerapp instance, for forwarding
164 172 logging to a central location.""")
165 173
166 174 aliases = Dict(aliases)
167 175
168 176 # def find_key_file(self):
169 177 # """Set the key file.
170 178 #
171 179 # Here we don't try to actually see if it exists for is valid as that
172 180 # is hadled by the connection logic.
173 181 # """
174 182 # config = self.master_config
175 183 # # Find the actual controller key file
176 184 # if not config.Global.key_file:
177 185 # try_this = os.path.join(
178 186 # config.Global.profile_dir,
179 187 # config.Global.security_dir,
180 188 # config.Global.key_file_name
181 189 # )
182 190 # config.Global.key_file = try_this
183 191
184 192 def find_url_file(self):
185 193 """Set the url file.
186 194
187 195 Here we don't try to actually see if it exists for is valid as that
188 196 is hadled by the connection logic.
189 197 """
190 198 config = self.config
191 199 # Find the actual controller key file
192 200 if not self.url_file:
193 201 self.url_file = os.path.join(
194 202 self.profile_dir.security_dir,
195 203 self.url_file_name
196 204 )
197 205
198 206 def load_connector_file(self):
199 207 """load config from a JSON connector file,
200 208 at a *lower* priority than command-line/config files.
201 209 """
202 210
203 211 self.log.info("Loading url_file %r"%self.url_file)
204 212 config = self.config
205 213
206 214 with open(self.url_file) as f:
207 215 d = json.loads(f.read())
208 216
209 217 try:
210 218 config.Session.key
211 219 except AttributeError:
212 220 if d['exec_key']:
213 221 config.Session.key = asbytes(d['exec_key'])
214 222
215 223 try:
216 224 config.EngineFactory.location
217 225 except AttributeError:
218 226 config.EngineFactory.location = d['location']
219 227
220 228 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
221 229 try:
222 230 config.EngineFactory.url
223 231 except AttributeError:
224 232 config.EngineFactory.url = d['url']
225 233
226 234 try:
227 235 config.EngineFactory.sshserver
228 236 except AttributeError:
229 237 config.EngineFactory.sshserver = d['ssh']
230 238
231 239 def init_engine(self):
232 240 # This is the working dir by now.
233 241 sys.path.insert(0, '')
234 242 config = self.config
235 243 # print config
236 244 self.find_url_file()
237 245
238 246 # was the url manually specified?
239 247 keys = set(self.config.EngineFactory.keys())
240 248 keys = keys.union(set(self.config.RegistrationFactory.keys()))
241 249
242 250 if keys.intersection(set(['ip', 'url', 'port'])):
243 251 # Connection info was specified, don't wait for the file
244 252 url_specified = True
245 253 self.wait_for_url_file = 0
246 254 else:
247 255 url_specified = False
248 256
249 257 if self.wait_for_url_file and not os.path.exists(self.url_file):
250 258 self.log.warn("url_file %r not found"%self.url_file)
251 259 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
252 260 tic = time.time()
253 261 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
254 262 # wait for url_file to exist, for up to 10 seconds
255 263 time.sleep(0.1)
256 264
257 265 if os.path.exists(self.url_file):
258 266 self.load_connector_file()
259 267 elif not url_specified:
260 268 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
261 269 self.exit(1)
262 270
263 271
264 272 try:
265 273 exec_lines = config.Kernel.exec_lines
266 274 except AttributeError:
267 275 config.Kernel.exec_lines = []
268 276 exec_lines = config.Kernel.exec_lines
269 277
270 278 if self.startup_script:
271 279 enc = sys.getfilesystemencoding() or 'utf8'
272 280 cmd="execfile(%r)"%self.startup_script.encode(enc)
273 281 exec_lines.append(cmd)
274 282 if self.startup_command:
275 283 exec_lines.append(self.startup_command)
276 284
277 285 # Create the underlying shell class and Engine
278 286 # shell_class = import_item(self.master_config.Global.shell_class)
279 287 # print self.config
280 288 try:
281 289 self.engine = EngineFactory(config=config, log=self.log)
282 290 except:
283 291 self.log.error("Couldn't start the Engine", exc_info=True)
284 292 self.exit(1)
285 293
286 294 def forward_logging(self):
287 295 if self.log_url:
288 296 self.log.info("Forwarding logging to %s"%self.log_url)
289 297 context = self.engine.context
290 298 lsock = context.socket(zmq.PUB)
291 299 lsock.connect(self.log_url)
292 300 self.log.removeHandler(self._log_handler)
293 301 handler = EnginePUBHandler(self.engine, lsock)
294 302 handler.setLevel(self.log_level)
295 303 self.log.addHandler(handler)
296 304 self._log_handler = handler
297 305
298 306 def init_mpi(self):
299 307 global mpi
300 308 self.mpi = MPI(config=self.config)
301 309
302 310 mpi_import_statement = self.mpi.init_script
303 311 if mpi_import_statement:
304 312 try:
305 313 self.log.info("Initializing MPI:")
306 314 self.log.info(mpi_import_statement)
307 315 exec mpi_import_statement in globals()
308 316 except:
309 317 mpi = None
310 318 else:
311 319 mpi = None
312 320
313 321 def initialize(self, argv=None):
314 322 super(IPEngineApp, self).initialize(argv)
315 323 self.init_mpi()
316 324 self.init_engine()
317 325 self.forward_logging()
318 326
319 327 def start(self):
320 328 self.engine.start()
321 329 try:
322 330 self.engine.loop.start()
323 331 except KeyboardInterrupt:
324 332 self.log.critical("Engine Interrupted, shutting down...\n")
325 333
326 334
327 335 def launch_new_instance():
328 336 """Create and run the IPython engine"""
329 337 app = IPEngineApp.instance()
330 338 app.initialize()
331 339 app.start()
332 340
333 341
334 342 if __name__ == '__main__':
335 343 launch_new_instance()
336 344
@@ -1,1152 +1,1176 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import re
26 26 import stat
27 27 import time
28 28
29 29 # signal imports, handling various platforms, versions
30 30
31 31 from signal import SIGINT, SIGTERM
32 32 try:
33 33 from signal import SIGKILL
34 34 except ImportError:
35 35 # Windows
36 36 SIGKILL=SIGTERM
37 37
38 38 try:
39 39 # Windows >= 2.7, 3.2
40 40 from signal import CTRL_C_EVENT as SIGINT
41 41 except ImportError:
42 42 pass
43 43
44 44 from subprocess import Popen, PIPE, STDOUT
45 45 try:
46 46 from subprocess import check_output
47 47 except ImportError:
48 48 # pre-2.7, define check_output with Popen
49 49 def check_output(*args, **kwargs):
50 50 kwargs.update(dict(stdout=PIPE))
51 51 p = Popen(*args, **kwargs)
52 52 out,err = p.communicate()
53 53 return out
54 54
55 55 from zmq.eventloop import ioloop
56 56
57 57 from IPython.config.application import Application
58 58 from IPython.config.configurable import LoggingConfigurable
59 59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.traitlets import (
61 Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
61 63 from IPython.utils.path import get_ipython_module_path
62 64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63 65
64 66 from .win32support import forward_read_events
65 67
66 68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
67 69
68 70 WINDOWS = os.name == 'nt'
69 71
70 72 #-----------------------------------------------------------------------------
71 73 # Paths to the kernel apps
72 74 #-----------------------------------------------------------------------------
73 75
74 76
75 77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
76 78 'IPython.parallel.apps.ipclusterapp'
77 79 ))
78 80
79 81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
80 82 'IPython.parallel.apps.ipengineapp'
81 83 ))
82 84
83 85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
84 86 'IPython.parallel.apps.ipcontrollerapp'
85 87 ))
86 88
87 89 #-----------------------------------------------------------------------------
88 90 # Base launchers and errors
89 91 #-----------------------------------------------------------------------------
90 92
91 93
92 94 class LauncherError(Exception):
93 95 pass
94 96
95 97
96 98 class ProcessStateError(LauncherError):
97 99 pass
98 100
99 101
100 102 class UnknownStatus(LauncherError):
101 103 pass
102 104
103 105
104 106 class BaseLauncher(LoggingConfigurable):
105 107 """An asbtraction for starting, stopping and signaling a process."""
106 108
107 109 # In all of the launchers, the work_dir is where child processes will be
108 110 # run. This will usually be the profile_dir, but may not be. any work_dir
109 111 # passed into the __init__ method will override the config value.
110 112 # This should not be used to set the work_dir for the actual engine
111 113 # and controller. Instead, use their own config files or the
112 114 # controller_args, engine_args attributes of the launchers to add
113 115 # the work_dir option.
114 116 work_dir = Unicode(u'.')
115 117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
116 118
117 119 start_data = Any()
118 120 stop_data = Any()
119 121
120 122 def _loop_default(self):
121 123 return ioloop.IOLoop.instance()
122 124
123 125 def __init__(self, work_dir=u'.', config=None, **kwargs):
124 126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
125 127 self.state = 'before' # can be before, running, after
126 128 self.stop_callbacks = []
127 129 self.start_data = None
128 130 self.stop_data = None
129 131
130 132 @property
131 133 def args(self):
132 134 """A list of cmd and args that will be used to start the process.
133 135
134 136 This is what is passed to :func:`spawnProcess` and the first element
135 137 will be the process name.
136 138 """
137 139 return self.find_args()
138 140
139 141 def find_args(self):
140 142 """The ``.args`` property calls this to find the args list.
141 143
142 144 Subcommand should implement this to construct the cmd and args.
143 145 """
144 146 raise NotImplementedError('find_args must be implemented in a subclass')
145 147
146 148 @property
147 149 def arg_str(self):
148 150 """The string form of the program arguments."""
149 151 return ' '.join(self.args)
150 152
151 153 @property
152 154 def running(self):
153 155 """Am I running."""
154 156 if self.state == 'running':
155 157 return True
156 158 else:
157 159 return False
158 160
159 161 def start(self):
160 162 """Start the process."""
161 163 raise NotImplementedError('start must be implemented in a subclass')
162 164
163 165 def stop(self):
164 166 """Stop the process and notify observers of stopping.
165 167
166 168 This method will return None immediately.
167 169 To observe the actual process stopping, see :meth:`on_stop`.
168 170 """
169 171 raise NotImplementedError('stop must be implemented in a subclass')
170 172
171 173 def on_stop(self, f):
172 174 """Register a callback to be called with this Launcher's stop_data
173 175 when the process actually finishes.
174 176 """
175 177 if self.state=='after':
176 178 return f(self.stop_data)
177 179 else:
178 180 self.stop_callbacks.append(f)
179 181
180 182 def notify_start(self, data):
181 183 """Call this to trigger startup actions.
182 184
183 185 This logs the process startup and sets the state to 'running'. It is
184 186 a pass-through so it can be used as a callback.
185 187 """
186 188
187 189 self.log.info('Process %r started: %r' % (self.args[0], data))
188 190 self.start_data = data
189 191 self.state = 'running'
190 192 return data
191 193
192 194 def notify_stop(self, data):
193 195 """Call this to trigger process stop actions.
194 196
195 197 This logs the process stopping and sets the state to 'after'. Call
196 198 this to trigger callbacks registered via :meth:`on_stop`."""
197 199
198 200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
199 201 self.stop_data = data
200 202 self.state = 'after'
201 203 for i in range(len(self.stop_callbacks)):
202 204 d = self.stop_callbacks.pop()
203 205 d(data)
204 206 return data
205 207
206 208 def signal(self, sig):
207 209 """Signal the process.
208 210
209 211 Parameters
210 212 ----------
211 213 sig : str or int
212 214 'KILL', 'INT', etc., or any signal number
213 215 """
214 216 raise NotImplementedError('signal must be implemented in a subclass')
215 217
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
230
231 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
237
238 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
244 )
216 245
217 246 #-----------------------------------------------------------------------------
218 247 # Local process launchers
219 248 #-----------------------------------------------------------------------------
220 249
221 250
222 251 class LocalProcessLauncher(BaseLauncher):
223 252 """Start and stop an external process in an asynchronous manner.
224 253
225 254 This will launch the external process with a working directory of
226 255 ``self.work_dir``.
227 256 """
228 257
229 258 # This is used to to construct self.args, which is passed to
230 259 # spawnProcess.
231 260 cmd_and_args = List([])
232 261 poll_frequency = Int(100) # in ms
233 262
234 263 def __init__(self, work_dir=u'.', config=None, **kwargs):
235 264 super(LocalProcessLauncher, self).__init__(
236 265 work_dir=work_dir, config=config, **kwargs
237 266 )
238 267 self.process = None
239 268 self.poller = None
240 269
241 270 def find_args(self):
242 271 return self.cmd_and_args
243 272
244 273 def start(self):
245 274 if self.state == 'before':
246 275 self.process = Popen(self.args,
247 276 stdout=PIPE,stderr=PIPE,stdin=PIPE,
248 277 env=os.environ,
249 278 cwd=self.work_dir
250 279 )
251 280 if WINDOWS:
252 281 self.stdout = forward_read_events(self.process.stdout)
253 282 self.stderr = forward_read_events(self.process.stderr)
254 283 else:
255 284 self.stdout = self.process.stdout.fileno()
256 285 self.stderr = self.process.stderr.fileno()
257 286 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
258 287 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
259 288 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
260 289 self.poller.start()
261 290 self.notify_start(self.process.pid)
262 291 else:
263 292 s = 'The process was already started and has state: %r' % self.state
264 293 raise ProcessStateError(s)
265 294
266 295 def stop(self):
267 296 return self.interrupt_then_kill()
268 297
269 298 def signal(self, sig):
270 299 if self.state == 'running':
271 300 if WINDOWS and sig != SIGINT:
272 301 # use Windows tree-kill for better child cleanup
273 302 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
274 303 else:
275 304 self.process.send_signal(sig)
276 305
277 306 def interrupt_then_kill(self, delay=2.0):
278 307 """Send INT, wait a delay and then send KILL."""
279 308 try:
280 309 self.signal(SIGINT)
281 310 except Exception:
282 311 self.log.debug("interrupt failed")
283 312 pass
284 313 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
285 314 self.killer.start()
286 315
287 316 # callbacks, etc:
288 317
289 318 def handle_stdout(self, fd, events):
290 319 if WINDOWS:
291 320 line = self.stdout.recv()
292 321 else:
293 322 line = self.process.stdout.readline()
294 323 # a stopped process will be readable but return empty strings
295 324 if line:
296 325 self.log.info(line[:-1])
297 326 else:
298 327 self.poll()
299 328
300 329 def handle_stderr(self, fd, events):
301 330 if WINDOWS:
302 331 line = self.stderr.recv()
303 332 else:
304 333 line = self.process.stderr.readline()
305 334 # a stopped process will be readable but return empty strings
306 335 if line:
307 336 self.log.error(line[:-1])
308 337 else:
309 338 self.poll()
310 339
311 340 def poll(self):
312 341 status = self.process.poll()
313 342 if status is not None:
314 343 self.poller.stop()
315 344 self.loop.remove_handler(self.stdout)
316 345 self.loop.remove_handler(self.stderr)
317 346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
318 347 return status
319 348
320 class LocalControllerLauncher(LocalProcessLauncher):
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
321 350 """Launch a controller as a regular external process."""
322 351
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
324 help="""Popen command to launch ipcontroller.""")
325 # Command line arguments to ipcontroller.
326 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
327 help="""command-line args to pass to ipcontroller""")
328
329 352 def find_args(self):
330 return self.controller_cmd + self.controller_args
353 return self.controller_cmd + self.cluster_args + self.controller_args
331 354
332 def start(self, profile_dir):
355 def start(self):
333 356 """Start the controller by profile_dir."""
334 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
335 self.profile_dir = unicode(profile_dir)
336 357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
337 358 return super(LocalControllerLauncher, self).start()
338 359
339 360
340 class LocalEngineLauncher(LocalProcessLauncher):
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
341 362 """Launch a single engine as a regular externall process."""
342 363
343 engine_cmd = List(ipengine_cmd_argv, config=True,
344 help="""command to launch the Engine.""")
345 # Command line arguments for ipengine.
346 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
347 help="command-line arguments to pass to ipengine"
348 )
349
350 364 def find_args(self):
351 return self.engine_cmd + self.engine_args
365 return self.engine_cmd + self.cluster_args + self.engine_args
352 366
353 def start(self, profile_dir):
354 """Start the engine by profile_dir."""
355 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
356 self.profile_dir = unicode(profile_dir)
357 return super(LocalEngineLauncher, self).start()
358 367
359
360 class LocalEngineSetLauncher(BaseLauncher):
368 class LocalEngineSetLauncher(LocalEngineLauncher):
361 369 """Launch a set of engines as regular external processes."""
362 370
363 # Command line arguments for ipengine.
364 engine_args = List(
365 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
366 help="command-line arguments to pass to ipengine"
367 )
368 371 delay = CFloat(0.1, config=True,
369 372 help="""delay (in seconds) between starting each engine after the first.
370 373 This can help force the engines to get their ids in order, or limit
371 374 process flood when starting many engines."""
372 375 )
373 376
374 377 # launcher class
375 378 launcher_class = LocalEngineLauncher
376 379
377 380 launchers = Dict()
378 381 stop_data = Dict()
379 382
380 383 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 384 super(LocalEngineSetLauncher, self).__init__(
382 385 work_dir=work_dir, config=config, **kwargs
383 386 )
384 387 self.stop_data = {}
385 388
386 def start(self, n, profile_dir):
389 def start(self, n):
387 390 """Start n engines by profile or profile_dir."""
388 self.profile_dir = unicode(profile_dir)
389 391 dlist = []
390 392 for i in range(n):
391 393 if i > 0:
392 394 time.sleep(self.delay)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
398
394 399 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
395 401 el.engine_args = copy.deepcopy(self.engine_args)
396 402 el.on_stop(self._notice_engine_stopped)
397 d = el.start(profile_dir)
403 d = el.start()
398 404 if i==0:
399 405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 406 self.launchers[i] = el
401 407 dlist.append(d)
402 408 self.notify_start(dlist)
403 # The consumeErrors here could be dangerous
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 # dfinal.addCallback(self.notify_start)
406 409 return dlist
407 410
408 411 def find_args(self):
409 412 return ['engine set']
410 413
411 414 def signal(self, sig):
412 415 dlist = []
413 416 for el in self.launchers.itervalues():
414 417 d = el.signal(sig)
415 418 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 419 return dlist
418 420
419 421 def interrupt_then_kill(self, delay=1.0):
420 422 dlist = []
421 423 for el in self.launchers.itervalues():
422 424 d = el.interrupt_then_kill(delay)
423 425 dlist.append(d)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 426 return dlist
426 427
427 428 def stop(self):
428 429 return self.interrupt_then_kill()
429 430
430 431 def _notice_engine_stopped(self, data):
431 432 pid = data['pid']
432 433 for idx,el in self.launchers.iteritems():
433 434 if el.process.pid == pid:
434 435 break
435 436 self.launchers.pop(idx)
436 437 self.stop_data[idx] = data
437 438 if not self.launchers:
438 439 self.notify_stop(self.stop_data)
439 440
440 441
441 442 #-----------------------------------------------------------------------------
442 443 # MPIExec launchers
443 444 #-----------------------------------------------------------------------------
444 445
445 446
446 447 class MPIExecLauncher(LocalProcessLauncher):
447 448 """Launch an external process using mpiexec."""
448 449
449 450 mpi_cmd = List(['mpiexec'], config=True,
450 451 help="The mpiexec command to use in starting the process."
451 452 )
452 453 mpi_args = List([], config=True,
453 454 help="The command line arguments to pass to mpiexec."
454 455 )
455 program = List(['date'], config=True,
456 program = List(['date'],
456 457 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
458 program_args = List([],
458 459 help="The command line argument to the program."
459 460 )
460 461 n = Int(1)
461 462
462 463 def find_args(self):
463 464 """Build self.args using all the fields."""
464 465 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 466 self.program + self.program_args
466 467
467 468 def start(self, n):
468 469 """Start n instances of the program using mpiexec."""
469 470 self.n = n
470 471 return super(MPIExecLauncher, self).start()
471 472
472 473
473 class MPIExecControllerLauncher(MPIExecLauncher):
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
474 475 """Launch a controller using mpiexec."""
475 476
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 help="Popen command to launch the Contropper"
478 )
479 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
480 help="Command line arguments to pass to ipcontroller."
481 )
482 n = Int(1)
477 # alias back to *non-configurable* program[_args] for use in find_args()
478 # this way all Controller/EngineSetLaunchers have the same form, rather
479 # than *some* having `program_args` and others `controller_args`
480 @property
481 def program(self):
482 return self.controller_cmd
483
484 @property
485 def program_args(self):
486 return self.cluster_args + self.controller_args
483 487
484 def start(self, profile_dir):
488 def start(self):
485 489 """Start the controller by profile_dir."""
486 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
488 490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 491 return super(MPIExecControllerLauncher, self).start(1)
490 492
491 def find_args(self):
492 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
493 self.controller_cmd + self.controller_args
494
495 493
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 """Launch engines using mpiexec"""
497 496
498 program = List(ipengine_cmd_argv, config=True,
499 help="Popen command for ipengine"
500 )
501 program_args = List(
502 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
503 help="Command line arguments for ipengine."
504 )
505 n = Int(1)
497 # alias back to *non-configurable* program[_args] for use in find_args()
498 # this way all Controller/EngineSetLaunchers have the same form, rather
499 # than *some* having `program_args` and others `controller_args`
500 @property
501 def program(self):
502 return self.engine_cmd
503
504 @property
505 def program_args(self):
506 return self.cluster_args + self.engine_args
506 507
507 def start(self, n, profile_dir):
508 def start(self, n):
508 509 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['--profile-dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
511 510 self.n = n
512 511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 512 return super(MPIExecEngineSetLauncher, self).start(n)
514 513
515 514 #-----------------------------------------------------------------------------
516 515 # SSH launchers
517 516 #-----------------------------------------------------------------------------
518 517
519 518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
520 519
521 520 class SSHLauncher(LocalProcessLauncher):
522 521 """A minimal launcher for ssh.
523 522
524 523 To be useful this will probably have to be extended to use the ``sshx``
525 524 idea for environment variables. There could be other things this needs
526 525 as well.
527 526 """
528 527
529 528 ssh_cmd = List(['ssh'], config=True,
530 529 help="command for starting ssh")
531 530 ssh_args = List(['-tt'], config=True,
532 531 help="args to pass to ssh")
533 program = List(['date'], config=True,
532 program = List(['date'],
534 533 help="Program to launch via ssh")
535 program_args = List([], config=True,
534 program_args = List([],
536 535 help="args to pass to remote program")
537 536 hostname = Unicode('', config=True,
538 537 help="hostname on which to launch the program")
539 538 user = Unicode('', config=True,
540 539 help="username for ssh")
541 540 location = Unicode('', config=True,
542 541 help="user@hostname location for ssh in one setting")
543 542
544 543 def _hostname_changed(self, name, old, new):
545 544 if self.user:
546 545 self.location = u'%s@%s' % (self.user, new)
547 546 else:
548 547 self.location = new
549 548
550 549 def _user_changed(self, name, old, new):
551 550 self.location = u'%s@%s' % (new, self.hostname)
552 551
553 552 def find_args(self):
554 553 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 554 self.program + self.program_args
556 555
557 def start(self, profile_dir, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
556 def start(self, hostname=None, user=None):
559 557 if hostname is not None:
560 558 self.hostname = hostname
561 559 if user is not None:
562 560 self.user = user
563 561
564 562 return super(SSHLauncher, self).start()
565 563
566 564 def signal(self, sig):
567 565 if self.state == 'running':
568 566 # send escaped ssh connection-closer
569 567 self.process.stdin.write('~.')
570 568 self.process.stdin.flush()
571 569
572 570
573 571
574 class SSHControllerLauncher(SSHLauncher):
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
575 573
576 program = List(ipcontroller_cmd_argv, config=True,
577 help="remote ipcontroller command.")
578 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
579 help="Command line arguments to ipcontroller.")
574 # alias back to *non-configurable* program[_args] for use in find_args()
575 # this way all Controller/EngineSetLaunchers have the same form, rather
576 # than *some* having `program_args` and others `controller_args`
577 @property
578 def program(self):
579 return self.controller_cmd
580
581 @property
582 def program_args(self):
583 return self.cluster_args + self.controller_args
580 584
581 585
582 class SSHEngineLauncher(SSHLauncher):
583 program = List(ipengine_cmd_argv, config=True,
584 help="remote ipengine command.")
585 # Command line arguments for ipengine.
586 program_args = List(
587 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
588 help="Command line arguments to ipengine."
589 )
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
587
588 # alias back to *non-configurable* program[_args] for use in find_args()
589 # this way all Controller/EngineSetLaunchers have the same form, rather
590 # than *some* having `program_args` and others `controller_args`
591 @property
592 def program(self):
593 return self.engine_cmd
594
595 @property
596 def program_args(self):
597 return self.cluster_args + self.engine_args
598
590 599
591 600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 601 launcher_class = SSHEngineLauncher
593 602 engines = Dict(config=True,
594 603 help="""dict of engines to launch. This is a dict by hostname of ints,
595 604 corresponding to the number of engines to start on that host.""")
596 605
597 def start(self, n, profile_dir):
606 def start(self, n):
598 607 """Start engines by profile or profile_dir.
599 608 `n` is ignored, and the `engines` config property is used instead.
600 609 """
601 610
602 self.profile_dir = unicode(profile_dir)
603 611 dlist = []
604 612 for host, n in self.engines.iteritems():
605 613 if isinstance(n, (tuple, list)):
606 614 n, args = n
607 615 else:
608 616 args = copy.deepcopy(self.engine_args)
609 617
610 618 if '@' in host:
611 619 user,host = host.split('@',1)
612 620 else:
613 621 user=None
614 622 for i in range(n):
615 623 if i > 0:
616 624 time.sleep(self.delay)
617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 )
618 628
619 629 # Copy the engine args over to each engine launcher.
620 i
621 el.program_args = args
630 el.engine_cmd = self.engine_cmd
631 el.engine_args = args
622 632 el.on_stop(self._notice_engine_stopped)
623 d = el.start(profile_dir, user=user, hostname=host)
633 d = el.start(user=user, hostname=host)
624 634 if i==0:
625 635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
626 636 self.launchers[host+str(i)] = el
627 637 dlist.append(d)
628 638 self.notify_start(dlist)
629 639 return dlist
630 640
631 641
632 642
633 643 #-----------------------------------------------------------------------------
634 644 # Windows HPC Server 2008 scheduler launchers
635 645 #-----------------------------------------------------------------------------
636 646
637 647
638 648 # This is only used on Windows.
639 649 def find_job_cmd():
640 650 if WINDOWS:
641 651 try:
642 652 return find_cmd('job')
643 653 except (FindCmdError, ImportError):
644 654 # ImportError will be raised if win32api is not installed
645 655 return 'job'
646 656 else:
647 657 return 'job'
648 658
649 659
650 660 class WindowsHPCLauncher(BaseLauncher):
651 661
652 662 job_id_regexp = Unicode(r'\d+', config=True,
653 663 help="""A regular expression used to get the job id from the output of the
654 664 submit_command. """
655 665 )
656 666 job_file_name = Unicode(u'ipython_job.xml', config=True,
657 667 help="The filename of the instantiated job script.")
658 668 # The full path to the instantiated job script. This gets made dynamically
659 669 # by combining the work_dir with the job_file_name.
660 670 job_file = Unicode(u'')
661 671 scheduler = Unicode('', config=True,
662 672 help="The hostname of the scheduler to submit the job to.")
663 673 job_cmd = Unicode(find_job_cmd(), config=True,
664 674 help="The command for submitting jobs.")
665 675
666 676 def __init__(self, work_dir=u'.', config=None, **kwargs):
667 677 super(WindowsHPCLauncher, self).__init__(
668 678 work_dir=work_dir, config=config, **kwargs
669 679 )
670 680
671 681 @property
672 682 def job_file(self):
673 683 return os.path.join(self.work_dir, self.job_file_name)
674 684
675 685 def write_job_file(self, n):
676 686 raise NotImplementedError("Implement write_job_file in a subclass.")
677 687
678 688 def find_args(self):
679 689 return [u'job.exe']
680 690
681 691 def parse_job_id(self, output):
682 692 """Take the output of the submit command and return the job id."""
683 693 m = re.search(self.job_id_regexp, output)
684 694 if m is not None:
685 695 job_id = m.group()
686 696 else:
687 697 raise LauncherError("Job id couldn't be determined: %s" % output)
688 698 self.job_id = job_id
689 699 self.log.info('Job started with job id: %r' % job_id)
690 700 return job_id
691 701
692 702 def start(self, n):
693 703 """Start n copies of the process using the Win HPC job scheduler."""
694 704 self.write_job_file(n)
695 705 args = [
696 706 'submit',
697 707 '/jobfile:%s' % self.job_file,
698 708 '/scheduler:%s' % self.scheduler
699 709 ]
700 710 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
701 711
702 712 output = check_output([self.job_cmd]+args,
703 713 env=os.environ,
704 714 cwd=self.work_dir,
705 715 stderr=STDOUT
706 716 )
707 717 job_id = self.parse_job_id(output)
708 718 self.notify_start(job_id)
709 719 return job_id
710 720
711 721 def stop(self):
712 722 args = [
713 723 'cancel',
714 724 self.job_id,
715 725 '/scheduler:%s' % self.scheduler
716 726 ]
717 727 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
718 728 try:
719 729 output = check_output([self.job_cmd]+args,
720 730 env=os.environ,
721 731 cwd=self.work_dir,
722 732 stderr=STDOUT
723 733 )
724 734 except:
725 735 output = 'The job already appears to be stoppped: %r' % self.job_id
726 736 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
727 737 return output
728 738
729 739
730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
731 741
732 742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
733 743 help="WinHPC xml job file.")
734 extra_args = List([], config=False,
744 controller_args = List([], config=False,
735 745 help="extra args to pass to ipcontroller")
736 746
737 747 def write_job_file(self, n):
738 748 job = IPControllerJob(config=self.config)
739 749
740 750 t = IPControllerTask(config=self.config)
741 751 # The tasks work directory is *not* the actual work directory of
742 752 # the controller. It is used as the base path for the stdout/stderr
743 753 # files that the scheduler redirects to.
744 754 t.work_directory = self.profile_dir
745 755 # Add the profile_dir and from self.start().
746 t.controller_args.extend(self.extra_args)
756 t.controller_args.extend(self.cluster_args)
757 t.controller_args.extend(self.controller_args)
747 758 job.add_task(t)
748 759
749 760 self.log.info("Writing job description file: %s" % self.job_file)
750 761 job.write(self.job_file)
751 762
752 763 @property
753 764 def job_file(self):
754 765 return os.path.join(self.profile_dir, self.job_file_name)
755 766
756 def start(self, profile_dir):
767 def start(self):
757 768 """Start the controller by profile_dir."""
758 self.extra_args = ['--profile-dir=%s'%profile_dir]
759 self.profile_dir = unicode(profile_dir)
760 769 return super(WindowsHPCControllerLauncher, self).start(1)
761 770
762 771
763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
764 773
765 774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
766 775 help="jobfile for ipengines job")
767 extra_args = List([], config=False,
776 engine_args = List([], config=False,
768 777 help="extra args to pas to ipengine")
769 778
770 779 def write_job_file(self, n):
771 780 job = IPEngineSetJob(config=self.config)
772 781
773 782 for i in range(n):
774 783 t = IPEngineTask(config=self.config)
775 784 # The tasks work directory is *not* the actual work directory of
776 785 # the engine. It is used as the base path for the stdout/stderr
777 786 # files that the scheduler redirects to.
778 787 t.work_directory = self.profile_dir
779 788 # Add the profile_dir and from self.start().
780 t.engine_args.extend(self.extra_args)
789 t.controller_args.extend(self.cluster_args)
790 t.controller_args.extend(self.engine_args)
781 791 job.add_task(t)
782 792
783 793 self.log.info("Writing job description file: %s" % self.job_file)
784 794 job.write(self.job_file)
785 795
786 796 @property
787 797 def job_file(self):
788 798 return os.path.join(self.profile_dir, self.job_file_name)
789 799
790 def start(self, n, profile_dir):
800 def start(self, n):
791 801 """Start the controller by profile_dir."""
792 self.extra_args = ['--profile-dir=%s'%profile_dir]
793 self.profile_dir = unicode(profile_dir)
794 802 return super(WindowsHPCEngineSetLauncher, self).start(n)
795 803
796 804
797 805 #-----------------------------------------------------------------------------
798 806 # Batch (PBS) system launchers
799 807 #-----------------------------------------------------------------------------
800 808
809 class BatchClusterAppMixin(ClusterAppMixin):
810 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
811 def _profile_dir_changed(self, name, old, new):
812 self.context[name] = new
813 _cluster_id_changed = _profile_dir_changed
814
815 def _profile_dir_default(self):
816 self.context['profile_dir'] = ''
817 return ''
818 def _cluster_id_default(self):
819 self.context['cluster_id'] = ''
820 return ''
821
822
801 823 class BatchSystemLauncher(BaseLauncher):
802 824 """Launch an external process using a batch system.
803 825
804 826 This class is designed to work with UNIX batch systems like PBS, LSF,
805 827 GridEngine, etc. The overall model is that there are different commands
806 828 like qsub, qdel, etc. that handle the starting and stopping of the process.
807 829
808 830 This class also has the notion of a batch script. The ``batch_template``
809 831 attribute can be set to a string that is a template for the batch script.
810 832 This template is instantiated using string formatting. Thus the template can
811 833 use {n} fot the number of instances. Subclasses can add additional variables
812 834 to the template dict.
813 835 """
814 836
815 837 # Subclasses must fill these in. See PBSEngineSet
816 838 submit_command = List([''], config=True,
817 839 help="The name of the command line program used to submit jobs.")
818 840 delete_command = List([''], config=True,
819 841 help="The name of the command line program used to delete jobs.")
820 842 job_id_regexp = Unicode('', config=True,
821 843 help="""A regular expression used to get the job id from the output of the
822 844 submit_command.""")
823 845 batch_template = Unicode('', config=True,
824 846 help="The string that is the batch script template itself.")
825 847 batch_template_file = Unicode(u'', config=True,
826 848 help="The file that contains the batch template.")
827 849 batch_file_name = Unicode(u'batch_script', config=True,
828 850 help="The filename of the instantiated batch script.")
829 851 queue = Unicode(u'', config=True,
830 852 help="The PBS Queue.")
831 853
854 def _queue_changed(self, name, old, new):
855 self.context[name] = new
856
857 n = Int(1)
858 _n_changed = _queue_changed
859
832 860 # not configurable, override in subclasses
833 861 # PBS Job Array regex
834 862 job_array_regexp = Unicode('')
835 863 job_array_template = Unicode('')
836 864 # PBS Queue regex
837 865 queue_regexp = Unicode('')
838 866 queue_template = Unicode('')
839 867 # The default batch template, override in subclasses
840 868 default_template = Unicode('')
841 869 # The full path to the instantiated batch script.
842 870 batch_file = Unicode(u'')
843 871 # the format dict used with batch_template:
844 872 context = Dict()
845 873 # the Formatter instance for rendering the templates:
846 874 formatter = Instance(EvalFormatter, (), {})
847 875
848 876
849 877 def find_args(self):
850 878 return self.submit_command + [self.batch_file]
851 879
852 880 def __init__(self, work_dir=u'.', config=None, **kwargs):
853 881 super(BatchSystemLauncher, self).__init__(
854 882 work_dir=work_dir, config=config, **kwargs
855 883 )
856 884 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
857 885
858 886 def parse_job_id(self, output):
859 887 """Take the output of the submit command and return the job id."""
860 888 m = re.search(self.job_id_regexp, output)
861 889 if m is not None:
862 890 job_id = m.group()
863 891 else:
864 892 raise LauncherError("Job id couldn't be determined: %s" % output)
865 893 self.job_id = job_id
866 894 self.log.info('Job submitted with job id: %r' % job_id)
867 895 return job_id
868 896
869 897 def write_batch_script(self, n):
870 898 """Instantiate and write the batch script to the work_dir."""
871 self.context['n'] = n
872 self.context['queue'] = self.queue
899 self.n = n
873 900 # first priority is batch_template if set
874 901 if self.batch_template_file and not self.batch_template:
875 902 # second priority is batch_template_file
876 903 with open(self.batch_template_file) as f:
877 904 self.batch_template = f.read()
878 905 if not self.batch_template:
879 906 # third (last) priority is default_template
880 907 self.batch_template = self.default_template
881 908
882 909 # add jobarray or queue lines to user-specified template
883 910 # note that this is *only* when user did not specify a template.
884 911 regex = re.compile(self.job_array_regexp)
885 912 # print regex.search(self.batch_template)
886 913 if not regex.search(self.batch_template):
887 914 self.log.info("adding job array settings to batch script")
888 915 firstline, rest = self.batch_template.split('\n',1)
889 916 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
890 917
891 918 regex = re.compile(self.queue_regexp)
892 919 # print regex.search(self.batch_template)
893 920 if self.queue and not regex.search(self.batch_template):
894 921 self.log.info("adding PBS queue settings to batch script")
895 922 firstline, rest = self.batch_template.split('\n',1)
896 923 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
897 924
898 925 script_as_string = self.formatter.format(self.batch_template, **self.context)
899 926 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
900 927
901 928 with open(self.batch_file, 'w') as f:
902 929 f.write(script_as_string)
903 930 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
904 931
905 def start(self, n, profile_dir):
932 def start(self, n):
906 933 """Start n copies of the process using a batch system."""
907 934 # Here we save profile_dir in the context so they
908 935 # can be used in the batch script template as {profile_dir}
909 self.context['profile_dir'] = profile_dir
910 self.profile_dir = unicode(profile_dir)
911 936 self.write_batch_script(n)
912 937 output = check_output(self.args, env=os.environ)
913 938
914 939 job_id = self.parse_job_id(output)
915 940 self.notify_start(job_id)
916 941 return job_id
917 942
918 943 def stop(self):
919 944 output = check_output(self.delete_command+[self.job_id], env=os.environ)
920 945 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
921 946 return output
922 947
923 948
924 949 class PBSLauncher(BatchSystemLauncher):
925 950 """A BatchSystemLauncher subclass for PBS."""
926 951
927 952 submit_command = List(['qsub'], config=True,
928 953 help="The PBS submit command ['qsub']")
929 954 delete_command = List(['qdel'], config=True,
930 955 help="The PBS delete command ['qsub']")
931 956 job_id_regexp = Unicode(r'\d+', config=True,
932 957 help="Regular expresion for identifying the job ID [r'\d+']")
933 958
934 959 batch_file = Unicode(u'')
935 960 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
936 961 job_array_template = Unicode('#PBS -t 1-{n}')
937 962 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
938 963 queue_template = Unicode('#PBS -q {queue}')
939 964
940 965
941 class PBSControllerLauncher(PBSLauncher):
966 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
942 967 """Launch a controller using PBS."""
943 968
944 969 batch_file_name = Unicode(u'pbs_controller', config=True,
945 970 help="batch file name for the controller job.")
946 971 default_template= Unicode("""#!/bin/sh
947 972 #PBS -V
948 973 #PBS -N ipcontroller
949 %s --log-to-file --profile-dir={profile_dir}
974 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
950 975 """%(' '.join(ipcontroller_cmd_argv)))
951 976
952 def start(self, profile_dir):
977
978 def start(self):
953 979 """Start the controller by profile or profile_dir."""
954 980 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 return super(PBSControllerLauncher, self).start(1, profile_dir)
981 return super(PBSControllerLauncher, self).start(1)
956 982
957 983
958 class PBSEngineSetLauncher(PBSLauncher):
984 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
959 985 """Launch Engines using PBS"""
960 986 batch_file_name = Unicode(u'pbs_engines', config=True,
961 987 help="batch file name for the engine(s) job.")
962 988 default_template= Unicode(u"""#!/bin/sh
963 989 #PBS -V
964 990 #PBS -N ipengine
965 %s --profile-dir={profile_dir}
991 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
966 992 """%(' '.join(ipengine_cmd_argv)))
967 993
968 def start(self, n, profile_dir):
994 def start(self, n):
969 995 """Start n engines by profile or profile_dir."""
970 996 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
971 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
997 return super(PBSEngineSetLauncher, self).start(n)
972 998
973 999 #SGE is very similar to PBS
974 1000
975 1001 class SGELauncher(PBSLauncher):
976 1002 """Sun GridEngine is a PBS clone with slightly different syntax"""
977 1003 job_array_regexp = Unicode('#\$\W+\-t')
978 1004 job_array_template = Unicode('#$ -t 1-{n}')
979 1005 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
980 1006 queue_template = Unicode('#$ -q {queue}')
981 1007
982 class SGEControllerLauncher(SGELauncher):
1008 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
983 1009 """Launch a controller using SGE."""
984 1010
985 1011 batch_file_name = Unicode(u'sge_controller', config=True,
986 1012 help="batch file name for the ipontroller job.")
987 1013 default_template= Unicode(u"""#$ -V
988 1014 #$ -S /bin/sh
989 1015 #$ -N ipcontroller
990 %s --log-to-file --profile-dir={profile_dir}
1016 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
991 1017 """%(' '.join(ipcontroller_cmd_argv)))
992 1018
993 def start(self, profile_dir):
1019 def start(self):
994 1020 """Start the controller by profile or profile_dir."""
995 1021 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
996 return super(SGEControllerLauncher, self).start(1, profile_dir)
1022 return super(SGEControllerLauncher, self).start(1)
997 1023
998 class SGEEngineSetLauncher(SGELauncher):
1024 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
999 1025 """Launch Engines with SGE"""
1000 1026 batch_file_name = Unicode(u'sge_engines', config=True,
1001 1027 help="batch file name for the engine(s) job.")
1002 1028 default_template = Unicode("""#$ -V
1003 1029 #$ -S /bin/sh
1004 1030 #$ -N ipengine
1005 %s --profile-dir={profile_dir}
1031 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1006 1032 """%(' '.join(ipengine_cmd_argv)))
1007 1033
1008 def start(self, n, profile_dir):
1034 def start(self, n):
1009 1035 """Start n engines by profile or profile_dir."""
1010 1036 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1011 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1037 return super(SGEEngineSetLauncher, self).start(n)
1012 1038
1013 1039
1014 1040 # LSF launchers
1015 1041
1016 1042 class LSFLauncher(BatchSystemLauncher):
1017 1043 """A BatchSystemLauncher subclass for LSF."""
1018 1044
1019 1045 submit_command = List(['bsub'], config=True,
1020 1046 help="The PBS submit command ['bsub']")
1021 1047 delete_command = List(['bkill'], config=True,
1022 1048 help="The PBS delete command ['bkill']")
1023 1049 job_id_regexp = Unicode(r'\d+', config=True,
1024 1050 help="Regular expresion for identifying the job ID [r'\d+']")
1025 1051
1026 1052 batch_file = Unicode(u'')
1027 1053 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1028 1054 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1029 1055 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1030 1056 queue_template = Unicode('#BSUB -q {queue}')
1031 1057
1032 def start(self, n, profile_dir):
1058 def start(self, n):
1033 1059 """Start n copies of the process using LSF batch system.
1034 1060 This cant inherit from the base class because bsub expects
1035 1061 to be piped a shell script in order to honor the #BSUB directives :
1036 1062 bsub < script
1037 1063 """
1038 1064 # Here we save profile_dir in the context so they
1039 1065 # can be used in the batch script template as {profile_dir}
1040 self.context['profile_dir'] = profile_dir
1041 self.profile_dir = unicode(profile_dir)
1042 1066 self.write_batch_script(n)
1043 1067 #output = check_output(self.args, env=os.environ)
1044 1068 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1045 1069 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1046 1070 output,err = p.communicate()
1047 1071 job_id = self.parse_job_id(output)
1048 1072 self.notify_start(job_id)
1049 1073 return job_id
1050 1074
1051 1075
1052 class LSFControllerLauncher(LSFLauncher):
1076 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1053 1077 """Launch a controller using LSF."""
1054 1078
1055 1079 batch_file_name = Unicode(u'lsf_controller', config=True,
1056 1080 help="batch file name for the controller job.")
1057 1081 default_template= Unicode("""#!/bin/sh
1058 1082 #BSUB -J ipcontroller
1059 1083 #BSUB -oo ipcontroller.o.%%J
1060 1084 #BSUB -eo ipcontroller.e.%%J
1061 %s --log-to-file --profile-dir={profile_dir}
1085 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1062 1086 """%(' '.join(ipcontroller_cmd_argv)))
1063 1087
1064 def start(self, profile_dir):
1088 def start(self):
1065 1089 """Start the controller by profile or profile_dir."""
1066 1090 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1067 return super(LSFControllerLauncher, self).start(1, profile_dir)
1091 return super(LSFControllerLauncher, self).start(1)
1068 1092
1069 1093
1070 class LSFEngineSetLauncher(LSFLauncher):
1094 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1071 1095 """Launch Engines using LSF"""
1072 1096 batch_file_name = Unicode(u'lsf_engines', config=True,
1073 1097 help="batch file name for the engine(s) job.")
1074 1098 default_template= Unicode(u"""#!/bin/sh
1075 1099 #BSUB -oo ipengine.o.%%J
1076 1100 #BSUB -eo ipengine.e.%%J
1077 %s --profile-dir={profile_dir}
1101 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1078 1102 """%(' '.join(ipengine_cmd_argv)))
1079 1103
1080 def start(self, n, profile_dir):
1104 def start(self, n):
1081 1105 """Start n engines by profile or profile_dir."""
1082 1106 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1083 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1107 return super(LSFEngineSetLauncher, self).start(n)
1084 1108
1085 1109
1086 1110 #-----------------------------------------------------------------------------
1087 1111 # A launcher for ipcluster itself!
1088 1112 #-----------------------------------------------------------------------------
1089 1113
1090 1114
1091 1115 class IPClusterLauncher(LocalProcessLauncher):
1092 1116 """Launch the ipcluster program in an external process."""
1093 1117
1094 1118 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1095 1119 help="Popen command for ipcluster")
1096 1120 ipcluster_args = List(
1097 1121 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1098 1122 help="Command line arguments to pass to ipcluster.")
1099 1123 ipcluster_subcommand = Unicode('start')
1100 1124 ipcluster_n = Int(2)
1101 1125
1102 1126 def find_args(self):
1103 1127 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1104 1128 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1105 1129
1106 1130 def start(self):
1107 1131 self.log.info("Starting ipcluster: %r" % self.args)
1108 1132 return super(IPClusterLauncher, self).start()
1109 1133
1110 1134 #-----------------------------------------------------------------------------
1111 1135 # Collections of launchers
1112 1136 #-----------------------------------------------------------------------------
1113 1137
1114 1138 local_launchers = [
1115 1139 LocalControllerLauncher,
1116 1140 LocalEngineLauncher,
1117 1141 LocalEngineSetLauncher,
1118 1142 ]
1119 1143 mpi_launchers = [
1120 1144 MPIExecLauncher,
1121 1145 MPIExecControllerLauncher,
1122 1146 MPIExecEngineSetLauncher,
1123 1147 ]
1124 1148 ssh_launchers = [
1125 1149 SSHLauncher,
1126 1150 SSHControllerLauncher,
1127 1151 SSHEngineLauncher,
1128 1152 SSHEngineSetLauncher,
1129 1153 ]
1130 1154 winhpc_launchers = [
1131 1155 WindowsHPCLauncher,
1132 1156 WindowsHPCControllerLauncher,
1133 1157 WindowsHPCEngineSetLauncher,
1134 1158 ]
1135 1159 pbs_launchers = [
1136 1160 PBSLauncher,
1137 1161 PBSControllerLauncher,
1138 1162 PBSEngineSetLauncher,
1139 1163 ]
1140 1164 sge_launchers = [
1141 1165 SGELauncher,
1142 1166 SGEControllerLauncher,
1143 1167 SGEEngineSetLauncher,
1144 1168 ]
1145 1169 lsf_launchers = [
1146 1170 LSFLauncher,
1147 1171 LSFControllerLauncher,
1148 1172 LSFEngineSetLauncher,
1149 1173 ]
1150 1174 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1151 1175 + pbs_launchers + sge_launchers + lsf_launchers
1152 1176
General Comments 0
You need to be logged in to leave comments. Login now