##// END OF EJS Templates
Adding files for the refactored kernel scripts.
Brian Granger -
Show More
@@ -0,0 +1,66 b''
1 import os
2
3 c = get_config()
4
5 # Options are:
6 # * LocalControllerLauncher
7 # * PBSControllerLauncher
8 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
9
10 # Options are:
11 # * LocalEngineSetLauncher
12 # * MPIExecEngineSetLauncher
13 # * PBSEngineSetLauncher
14 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
15
16 # c.Global.log_to_file = False
17 # c.Global.n = 2
18 # c.Global.reset_config = False
19
20 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
21 # c.MPIExecLauncher.mpi_args = []
22 # c.MPIExecLauncher.program = []
23 # c.MPIExecLauncher.program_args = []
24 # c.MPIExecLauncher.n = 1
25
26 # c.SSHLauncher.ssh_cmd = ['ssh']
27 # c.SSHLauncher.ssh_args = []
28 # c.SSHLauncher.program = []
29 # s.SSHLauncher.program_args = []
30 # c.SSHLauncher.hostname = ''
31 # c.SSHLauncher.user = os.environ['USER']
32
33 # c.PBSLauncher.submit_command = 'qsub'
34 # c.PBSLauncher.delete_command = 'qdel'
35 # c.PBSLauncher.job_id_regexp = '\d+'
36 # c.PBSLauncher.batch_template = """"""
37 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
38
39 # c.LocalControllerLauncher.controller_args = []
40
41 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
42 # c.MPIExecControllerLauncher.mpi_args = []
43 # c.MPIExecControllerLauncher.controller_args = []
44 # c.MPIExecControllerLauncher.n = 1
45
46 # c.PBSControllerLauncher.submit_command = 'qsub'
47 # c.PBSControllerLauncher.delete_command = 'qdel'
48 # c.PBSControllerLauncher.job_id_regexp = '\d+'
49 # c.PBSControllerLauncher.batch_template = """"""
50 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
51
52 # c.LocalEngineLauncher.engine_args = []
53
54 # c.LocalEngineSetLauncher.engine_args = []
55
56 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
57 # c.MPIExecEngineSetLauncher.mpi_args = []
58 # c.MPIExecEngineSetLauncher.controller_args = []
59 # c.MPIExecEngineSetLauncher.n = 1
60
61 # c.PBSEngineSetLauncher.submit_command = 'qsub'
62 # c.PBSEngineSetLauncher.delete_command = 'qdel'
63 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
64 # c.PBSEngineSetLauncher.batch_template = """"""
65 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script'
66
@@ -0,0 +1,283 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 The ipcluster application.
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 import logging
19 import os
20 import signal
21 import sys
22
23 from IPython.core import release
24 from IPython.external import argparse
25 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
26 from IPython.utils.importstring import import_item
27
28 from IPython.kernel.clusterdir import (
29 ApplicationWithClusterDir, ClusterDirError
30 )
31
32 from twisted.internet import reactor, defer
33 from twisted.python import log
34
35 #-----------------------------------------------------------------------------
36 # Code for launchers
37 #-----------------------------------------------------------------------------
38
39
40
41 #-----------------------------------------------------------------------------
42 # The ipcluster application
43 #-----------------------------------------------------------------------------
44
45
46 class IPClusterCLLoader(ArgParseConfigLoader):
47
48 def _add_arguments(self):
49 # This has all the common options that all subcommands use
50 parent_parser1 = argparse.ArgumentParser(add_help=False)
51 parent_parser1.add_argument('-ipythondir', '--ipython-dir',
52 dest='Global.ipythondir',type=str,
53 help='Set to override default location of Global.ipythondir.',
54 default=NoConfigDefault,
55 metavar='Global.ipythondir')
56 parent_parser1.add_argument('-log_level', '--log-level',
57 dest="Global.log_level",type=int,
58 help='Set the log level (0,10,20,30,40,50). Default is 30.',
59 default=NoConfigDefault,
60 metavar='Global.log_level')
61
62 # This has all the common options that other subcommands use
63 parent_parser2 = argparse.ArgumentParser(add_help=False)
64 parent_parser2.add_argument('-p','-profile', '--profile',
65 dest='Global.profile',type=str,
66 default=NoConfigDefault,
67 help='The string name of the profile to be used. This determines '
68 'the name of the cluster dir as: cluster_<profile>. The default profile '
69 'is named "default". The cluster directory is resolve this way '
70 'if the --cluster-dir option is not used.',
71 default=NoConfigDefault,
72 metavar='Global.profile')
73 parent_parser2.add_argument('-cluster_dir', '--cluster-dir',
74 dest='Global.cluster_dir',type=str,
75 default=NoConfigDefault,
76 help='Set the cluster dir. This overrides the logic used by the '
77 '--profile option.',
78 default=NoConfigDefault,
79 metavar='Global.cluster_dir')
80 parent_parser2.add_argument('--log-to-file',
81 action='store_true', dest='Global.log_to_file',
82 default=NoConfigDefault,
83 help='Log to a file in the log directory (default is stdout)'
84 )
85
86 subparsers = self.parser.add_subparsers(
87 dest='Global.subcommand',
88 title='ipcluster subcommands',
89 description='ipcluster has a variety of subcommands. '
90 'The general way of running ipcluster is "ipcluster <cmd> '
91 ' [options]""',
92 help='For more help, type "ipcluster <cmd> -h"')
93
94 parser_list = subparsers.add_parser(
95 'list',
96 help='List all clusters in cwd and ipythondir.',
97 parents=[parent_parser1]
98 )
99
100 parser_create = subparsers.add_parser(
101 'create',
102 help='Create a new cluster directory.',
103 parents=[parent_parser1, parent_parser2]
104 )
105 parser_create.add_argument(
106 '--reset-config',
107 dest='Global.reset_config', action='store_true',
108 default=NoConfigDefault,
109 help='Recopy the default config files to the cluster directory. '
110 'You will loose any modifications you have made to these files.'
111 )
112
113 parser_start = subparsers.add_parser(
114 'start',
115 help='Start a cluster.',
116 parents=[parent_parser1, parent_parser2]
117 )
118 parser_start.add_argument(
119 '-n', '--number',
120 type=int, dest='Global.n',
121 default=NoConfigDefault,
122 help='The number of engines to start.',
123 metavar='Global.n'
124 )
125
126
127 default_config_file_name = 'ipcluster_config.py'
128
129
130 class IPClusterApp(ApplicationWithClusterDir):
131
132 name = 'ipcluster'
133 description = 'Start an IPython cluster (controller and engines).'
134 config_file_name = default_config_file_name
135 default_log_level = logging.INFO
136 auto_create_cluster_dir = False
137
138 def create_default_config(self):
139 super(IPClusterApp, self).create_default_config()
140 self.default_config.Global.controller_launcher = \
141 'IPython.kernel.launcher.LocalControllerLauncher'
142 self.default_config.Global.engine_launcher = \
143 'IPython.kernel.launcher.LocalEngineSetLauncher'
144 self.default_config.Global.log_to_file = False
145 self.default_config.Global.n = 2
146 self.default_config.Global.reset_config = False
147
148 def create_command_line_config(self):
149 """Create and return a command line config loader."""
150 return IPClusterCLLoader(
151 description=self.description,
152 version=release.version
153 )
154
155 def find_resources(self):
156 subcommand = self.command_line_config.Global.subcommand
157 if subcommand=='list':
158 self.list_cluster_dirs()
159 # Exit immediately because there is nothing left to do.
160 self.exit()
161 elif subcommand=='create':
162 self.auto_create_cluster_dir = True
163 super(IPClusterApp, self).find_resources()
164 elif subcommand=='start':
165 self.auto_create_cluster_dir = False
166 try:
167 super(IPClusterApp, self).find_resources()
168 except ClusterDirError:
169 raise ClusterDirError(
170 "Could not find a cluster directory. A cluster dir must "
171 "be created before running 'ipcluster start'. Do "
172 "'ipcluster create -h' or 'ipcluster list -h' for more "
173 "information about creating and listing cluster dirs."
174 )
175 def construct(self):
176 config = self.master_config
177 if config.Global.subcommand=='list':
178 pass
179 elif config.Global.subcommand=='create':
180 self.log.info('Copying default config files to cluster directory '
181 '[overwrite=%r]' % (config.Global.reset_config,))
182 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
183 elif config.Global.subcommand=='start':
184 self.start_logging()
185 reactor.callWhenRunning(self.start_launchers)
186
187 def list_cluster_dirs(self):
188 cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','')
189 if cluster_dir_paths:
190 cluster_dir_paths = cluster_dir_paths.split(':')
191 else:
192 cluster_dir_paths = []
193 # We need to look both in default_config and command_line_config!!!
194 paths = [os.getcwd(), self.default_config.Global.ipythondir] + \
195 cluster_dir_paths
196 self.log.info('Searching for cluster dirs in paths: %r' % paths)
197 for path in paths:
198 files = os.listdir(path)
199 for f in files:
200 full_path = os.path.join(path, f)
201 if os.path.isdir(full_path) and f.startswith('cluster_'):
202 profile = full_path.split('_')[-1]
203 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
204 print start_cmd + " ==> " + full_path
205
206 def start_logging(self):
207 if self.master_config.Global.log_to_file:
208 log_filename = self.name + '-' + str(os.getpid()) + '.log'
209 logfile = os.path.join(self.log_dir, log_filename)
210 open_log_file = open(logfile, 'w')
211 else:
212 open_log_file = sys.stdout
213 log.startLogging(open_log_file)
214
215 def start_launchers(self):
216 config = self.master_config
217
218 # Create the launchers
219 el_class = import_item(config.Global.engine_launcher)
220 self.engine_launcher = el_class(
221 self.cluster_dir, config=config
222 )
223 cl_class = import_item(config.Global.controller_launcher)
224 self.controller_launcher = cl_class(
225 self.cluster_dir, config=config
226 )
227
228 # Setup signals
229 signal.signal(signal.SIGINT, self.stop_launchers)
230
231 # Setup the observing of stopping
232 d1 = self.controller_launcher.observe_stop()
233 d1.addCallback(self.stop_engines)
234 d1.addErrback(self.err_and_stop)
235 # If this triggers, just let them die
236 # d2 = self.engine_launcher.observe_stop()
237
238 # Start the controller and engines
239 d = self.controller_launcher.start(
240 profile=None, cluster_dir=config.Global.cluster_dir
241 )
242 d.addCallback(lambda _: self.start_engines())
243 d.addErrback(self.err_and_stop)
244
245 def err_and_stop(self, f):
246 log.msg('Unexpected error in ipcluster:')
247 log.err(f)
248 reactor.stop()
249
250 def stop_engines(self, r):
251 return self.engine_launcher.stop()
252
253 def start_engines(self):
254 config = self.master_config
255 d = self.engine_launcher.start(
256 config.Global.n,
257 profile=None, cluster_dir=config.Global.cluster_dir
258 )
259 return d
260
261 def stop_launchers(self, signum, frame):
262 log.msg("Stopping cluster")
263 d1 = self.engine_launcher.stop()
264 d1.addCallback(lambda _: self.controller_launcher.stop)
265 d1.addErrback(self.err_and_stop)
266 reactor.callLater(2.0, reactor.stop)
267
268 def start_app(self):
269 config = self.master_config
270 if config.Global.subcommand=='create' or config.Global.subcommand=='list':
271 return
272 elif config.Global.subcommand=='start':
273 reactor.run()
274
275
276 def launch_new_instance():
277 """Create and run the IPython cluster."""
278 app = IPClusterApp()
279 app.start()
280
281
282 if __name__ == '__main__':
283 launch_new_instance() No newline at end of file
This diff has been collapsed as it changes many lines, (585 lines changed) Show them Hide them
@@ -0,0 +1,585 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3 """
4 Facilities for launching processing asynchronously.
5 """
6
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 import os
19 import re
20 import sys
21
22 from IPython.core.component import Component
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26
27 from twisted.internet import reactor, defer
28 from twisted.internet.defer import inlineCallbacks
29 from twisted.internet.protocol import ProcessProtocol
30 from twisted.internet.utils import getProcessOutput
31 from twisted.internet.error import ProcessDone, ProcessTerminated
32 from twisted.python import log
33 from twisted.python.failure import Failure
34
35 #-----------------------------------------------------------------------------
36 # Generic launchers
37 #-----------------------------------------------------------------------------
38
39
40 class LauncherError(Exception):
41 pass
42
43
44 class ProcessStateError(LauncherError):
45 pass
46
47
48 class UnknownStatus(LauncherError):
49 pass
50
51
52 class BaseLauncher(Component):
53 """An asbtraction for starting, stopping and signaling a process."""
54
55 working_dir = Unicode(u'')
56
57 def __init__(self, working_dir, parent=None, name=None, config=None):
58 super(BaseLauncher, self).__init__(parent, name, config)
59 self.working_dir = working_dir
60 self.state = 'before' # can be before, running, after
61 self.stop_deferreds = []
62 self.start_data = None
63 self.stop_data = None
64
65 @property
66 def args(self):
67 """A list of cmd and args that will be used to start the process."""
68 return self.find_args()
69
70 def find_args(self):
71 """The ``.args`` property calls this to find the args list."""
72 raise NotImplementedError('find_args must be implemented in a subclass')
73
74 @property
75 def arg_str(self):
76 """The string form of the program arguments."""
77 return ' '.join(self.args)
78
79 @property
80 def running(self):
81 if self.state == 'running':
82 return True
83 else:
84 return False
85
86 def start(self):
87 """Start the process.
88
89 This must return a deferred that fires with information about the
90 process starting (like a pid, job id, etc.)
91 """
92 return defer.fail(
93 Failure(NotImplementedError(
94 'start must be implemented in a subclass')
95 )
96 )
97
98 def stop(self):
99 """Stop the process and notify observers of ProcessStopped.
100
101 This must return a deferred that fires with any errors that occur
102 while the process is attempting to be shut down. This deferred
103 won't fire when the process actually stops. These events are
104 handled by calling :func:`observe_stop`.
105 """
106 return defer.fail(
107 Failure(NotImplementedError(
108 'stop must be implemented in a subclass')
109 )
110 )
111
112 def observe_stop(self):
113 """Get a deferred that will fire when the process stops.
114
115 The deferred will fire with data that contains information about
116 the exit status of the process.
117 """
118 if self.state=='after':
119 return defer.succeed(self.stop_data)
120 else:
121 d = defer.Deferred()
122 self.stop_deferreds.append(d)
123 return d
124
125 def notify_start(self, data):
126 """Call this to tigger startup actions.
127
128 This logs the process startup and sets the state to running. It is
129 a pass-through so it can be used as a callback.
130 """
131
132 log.msg('Process %r started: %r' % (self.args[0], data))
133 self.start_data = data
134 self.state = 'running'
135 return data
136
137 def notify_stop(self, data):
138 """Call this to trigger all the deferreds from :func:`observe_stop`."""
139
140 log.msg('Process %r stopped: %r' % (self.args[0], data))
141 self.stop_data = data
142 self.state = 'after'
143 for i in range(len(self.stop_deferreds)):
144 d = self.stop_deferreds.pop()
145 d.callback(data)
146 return data
147
148 def signal(self, sig):
149 """Signal the process.
150
151 Return a semi-meaningless deferred after signaling the process.
152
153 Parameters
154 ----------
155 sig : str or int
156 'KILL', 'INT', etc., or any signal number
157 """
158 return defer.fail(
159 Failure(NotImplementedError(
160 'signal must be implemented in a subclass')
161 )
162 )
163
164
165 class LocalProcessLauncherProtocol(ProcessProtocol):
166 """A ProcessProtocol to go with the LocalProcessLauncher."""
167
168 def __init__(self, process_launcher):
169 self.process_launcher = process_launcher
170 self.pid = None
171
172 def connectionMade(self):
173 self.pid = self.transport.pid
174 self.process_launcher.notify_start(self.transport.pid)
175
176 def processEnded(self, status):
177 value = status.value
178 if isinstance(value, ProcessDone):
179 self.process_launcher.notify_stop(
180 {'exit_code':0,
181 'signal':None,
182 'status':None,
183 'pid':self.pid
184 }
185 )
186 elif isinstance(value, ProcessTerminated):
187 self.process_launcher.notify_stop(
188 {'exit_code':value.exitCode,
189 'signal':value.signal,
190 'status':value.status,
191 'pid':self.pid
192 }
193 )
194 else:
195 raise UnknownStatus("Unknown exit status, this is probably a "
196 "bug in Twisted")
197
198 def outReceived(self, data):
199 log.msg(data)
200
201 def errReceived(self, data):
202 log.err(data)
203
204
205 class LocalProcessLauncher(BaseLauncher):
206 """Start and stop an external process in an asynchronous manner."""
207
208 cmd_and_args = List([])
209
210 def __init__(self, working_dir, parent=None, name=None, config=None):
211 super(LocalProcessLauncher, self).__init__(
212 working_dir, parent, name, config
213 )
214 self.process_protocol = None
215 self.start_deferred = None
216
217 def find_args(self):
218 return self.cmd_and_args
219
220 def start(self):
221 if self.state == 'before':
222 self.process_protocol = LocalProcessLauncherProtocol(self)
223 self.start_deferred = defer.Deferred()
224 self.process_transport = reactor.spawnProcess(
225 self.process_protocol,
226 str(self.args[0]),
227 [str(a) for a in self.args],
228 env=os.environ
229 )
230 return self.start_deferred
231 else:
232 s = 'The process was already started and has state: %r' % self.state
233 return defer.fail(ProcessStateError(s))
234
235 def notify_start(self, data):
236 super(LocalProcessLauncher, self).notify_start(data)
237 self.start_deferred.callback(data)
238
239 def stop(self):
240 return self.interrupt_then_kill()
241
242 @make_deferred
243 def signal(self, sig):
244 if self.state == 'running':
245 self.process_transport.signalProcess(sig)
246
247 @inlineCallbacks
248 def interrupt_then_kill(self, delay=1.0):
249 yield self.signal('INT')
250 yield sleep_deferred(delay)
251 yield self.signal('KILL')
252
253
254 class MPIExecLauncher(LocalProcessLauncher):
255
256 mpi_cmd = List(['mpiexec'], config=True)
257 mpi_args = List([], config=True)
258 program = List(['date'], config=True)
259 program_args = List([], config=True)
260 n = Int(1, config=True)
261
262 def find_args(self):
263 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
264 self.program + self.program_args
265
266 def start(self, n):
267 self.n = n
268 return super(MPIExecLauncher, self).start()
269
270
271 class SSHLauncher(BaseLauncher):
272 """A minimal launcher for ssh.
273
274 To be useful this will probably have to be extended to use the ``sshx``
275 idea for environment variables. There could be other things this needs
276 as well.
277 """
278
279 ssh_cmd = List(['ssh'], config=True)
280 ssh_args = List([], config=True)
281 program = List(['date'], config=True)
282 program_args = List([], config=True)
283 hostname = Str('', config=True)
284 user = Str(os.environ['USER'], config=True)
285 location = Str('')
286
287 def _hostname_changed(self, name, old, new):
288 self.location = '%s@%s' % (self.user, new)
289
290 def _user_changed(self, name, old, new):
291 self.location = '%s@%s' % (new, self.hostname)
292
293 def find_args(self):
294 return self.ssh_cmd + self.ssh_args + [self.location] + \
295 self.program + self.program_args
296
297 def start(self, n, hostname=None, user=None):
298 if hostname is not None:
299 self.hostname = hostname
300 if user is not None:
301 self.user = user
302 return super(SSHLauncher, self).start()
303
304
305 class WindowsHPCLauncher(BaseLauncher):
306 pass
307
308
309 class BatchSystemLauncher(BaseLauncher):
310
311 # Subclasses must fill these in. See PBSEngineSet
312 submit_command = Str('', config=True)
313 delete_command = Str('', config=True)
314 job_id_regexp = Str('', config=True)
315 batch_template = Str('', config=True)
316 batch_file_name = Unicode(u'batch_script', config=True)
317 batch_file = Unicode(u'')
318
319 def __init__(self, working_dir, parent=None, name=None, config=None):
320 super(BatchSystemLauncher, self).__init__(
321 working_dir, parent, name, config
322 )
323 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
324 self.context = {}
325
326 def parse_job_id(self, output):
327 m = re.match(self.job_id_regexp, output)
328 if m is not None:
329 job_id = m.group()
330 else:
331 raise LauncherError("Job id couldn't be determined: %s" % output)
332 self.job_id = job_id
333 log.msg('Job started with job id: %r' % job_id)
334 return job_id
335
336 def write_batch_script(self, n):
337 self.context['n'] = n
338 script_as_string = Itpl.itplns(self.batch_template, self.context)
339 log.msg('Writing instantiated batch script: %s' % self.batch_file)
340 f = open(self.batch_file, 'w')
341 f.write(script_as_string)
342 f.close()
343
344 @inlineCallbacks
345 def start(self, n):
346 """Start n copies of the process using a batch system."""
347 self.write_batch_script(n)
348 output = yield getProcessOutput(self.submit_command,
349 [self.batch_file], env=os.environ)
350 job_id = self.parse_job_id(output)
351 self.notify_start(job_id)
352 defer.returnValue(job_id)
353
354 @inlineCallbacks
355 def stop(self):
356 output = yield getProcessOutput(self.delete_command,
357 [self.job_id], env=os.environ
358 )
359 self.notify_stop(output) # Pass the output of the kill cmd
360 defer.returnValue(output)
361
362
363 class PBSLauncher(BatchSystemLauncher):
364
365 submit_command = Str('qsub', config=True)
366 delete_command = Str('qdel', config=True)
367 job_id_regexp = Str('\d+', config=True)
368 batch_template = Str('', config=True)
369 batch_file_name = Unicode(u'pbs_batch_script', config=True)
370 batch_file = Unicode(u'')
371
372
373 #-----------------------------------------------------------------------------
374 # Controller launchers
375 #-----------------------------------------------------------------------------
376
377 def find_controller_cmd():
378 if sys.platform == 'win32':
379 # This logic is needed because the ipcontroller script doesn't
380 # always get installed in the same way or in the same location.
381 from IPython.kernel import ipcontrollerapp
382 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
383 # The -u option here turns on unbuffered output, which is required
384 # on Win32 to prevent wierd conflict and problems with Twisted.
385 # Also, use sys.executable to make sure we are picking up the
386 # right python exe.
387 cmd = [sys.executable, '-u', script_location]
388 else:
389 # ipcontroller has to be on the PATH in this case.
390 cmd = ['ipcontroller']
391 return cmd
392
393
394 class LocalControllerLauncher(LocalProcessLauncher):
395
396 controller_cmd = List(find_controller_cmd())
397 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
398
399 def find_args(self):
400 return self.controller_cmd + self.controller_args
401
402 def start(self, profile=None, cluster_dir=None):
403 if cluster_dir is not None:
404 self.controller_args.extend(['--cluster-dir', cluster_dir])
405 if profile is not None:
406 self.controller_args.extend(['--profile', profile])
407 log.msg("Starting LocalControllerLauncher: %r" % self.args)
408 return super(LocalControllerLauncher, self).start()
409
410
411 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
412 pass
413
414
415 class MPIExecControllerLauncher(MPIExecLauncher):
416
417 controller_cmd = List(find_controller_cmd(), config=False)
418 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
419 n = Int(1, config=False)
420
421 def start(self, profile=None, cluster_dir=None):
422 if cluster_dir is not None:
423 self.controller_args.extend(['--cluster-dir', cluster_dir])
424 if profile is not None:
425 self.controller_args.extend(['--profile', profile])
426 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
427 return super(MPIExecControllerLauncher, self).start(1)
428
429
430 def find_args(self):
431 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
432 self.controller_cmd + self.controller_args
433
434
435 class PBSControllerLauncher(PBSLauncher):
436
437 def start(self, profile=None, cluster_dir=None):
438 # Here we save profile and cluster_dir in the context so they
439 # can be used in the batch script template as ${profile} and
440 # ${cluster_dir}
441 if cluster_dir is not None:
442 self.context['cluster_dir'] = cluster_dir
443 if profile is not None:
444 self.context['profile'] = profile
445 log.msg("Starting PBSControllerLauncher: %r" % self.args)
446 return super(PBSControllerLauncher, self).start(1)
447
448
449 class SSHControllerLauncher(SSHLauncher):
450 pass
451
452
453 #-----------------------------------------------------------------------------
454 # Engine launchers
455 #-----------------------------------------------------------------------------
456
457
458 def find_engine_cmd():
459 if sys.platform == 'win32':
460 # This logic is needed because the ipengine script doesn't
461 # always get installed in the same way or in the same location.
462 from IPython.kernel import ipengineapp
463 script_location = ipengineapp.__file__.replace('.pyc', '.py')
464 # The -u option here turns on unbuffered output, which is required
465 # on Win32 to prevent wierd conflict and problems with Twisted.
466 # Also, use sys.executable to make sure we are picking up the
467 # right python exe.
468 cmd = [sys.executable, '-u', script_location]
469 else:
470 # ipcontroller has to be on the PATH in this case.
471 cmd = ['ipengine']
472 return cmd
473
474
475 class LocalEngineLauncher(LocalProcessLauncher):
476
477 engine_cmd = List(find_engine_cmd())
478 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
479
480 def find_args(self):
481 return self.engine_cmd + self.engine_args
482
483 def start(self, profile=None, cluster_dir=None):
484 if cluster_dir is not None:
485 self.engine_args.extend(['--cluster-dir', cluster_dir])
486 if profile is not None:
487 self.engine_args.extend(['--profile', profile])
488 return super(LocalEngineLauncher, self).start()
489
490
491 class LocalEngineSetLauncher(BaseLauncher):
492
493 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
494
495 def __init__(self, working_dir, parent=None, name=None, config=None):
496 super(LocalEngineSetLauncher, self).__init__(
497 working_dir, parent, name, config
498 )
499 self.launchers = []
500
501 def start(self, n, profile=None, cluster_dir=None):
502 dlist = []
503 for i in range(n):
504 el = LocalEngineLauncher(self.working_dir, self)
505 # Copy the engine args over to each engine launcher.
506 import copy
507 el.engine_args = copy.deepcopy(self.engine_args)
508 d = el.start(profile, cluster_dir)
509 if i==0:
510 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
511 self.launchers.append(el)
512 dlist.append(d)
513 # The consumeErrors here could be dangerous
514 dfinal = gatherBoth(dlist, consumeErrors=True)
515 dfinal.addCallback(self.notify_start)
516 return dfinal
517
518 def find_args(self):
519 return ['engine set']
520
521 def signal(self, sig):
522 dlist = []
523 for el in self.launchers:
524 d = el.signal(sig)
525 dlist.append(d)
526 dfinal = gatherBoth(dlist, consumeErrors=True)
527 return dfinal
528
529 def interrupt_then_kill(self, delay=1.0):
530 dlist = []
531 for el in self.launchers:
532 d = el.interrupt_then_kill(delay)
533 dlist.append(d)
534 dfinal = gatherBoth(dlist, consumeErrors=True)
535 return dfinal
536
537 def stop(self):
538 return self.interrupt_then_kill()
539
540 def observe_stop(self):
541 dlist = [el.observe_stop() for el in self.launchers]
542 dfinal = gatherBoth(dlist, consumeErrors=False)
543 dfinal.addCallback(self.notify_stop)
544 return dfinal
545
546
547 class MPIExecEngineSetLauncher(MPIExecLauncher):
548
549 engine_cmd = List(find_engine_cmd(), config=False)
550 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
551 n = Int(1, config=True)
552
553 def start(self, n, profile=None, cluster_dir=None):
554 if cluster_dir is not None:
555 self.engine_args.extend(['--cluster-dir', cluster_dir])
556 if profile is not None:
557 self.engine_args.extend(['--profile', profile])
558 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
559 return super(MPIExecEngineSetLauncher, self).start(n)
560
561 def find_args(self):
562 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
563 self.engine_cmd + self.engine_args
564
565
566 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
567 pass
568
569
570 class PBSEngineSetLauncher(PBSLauncher):
571
572 def start(self, n, profile=None, cluster_dir=None):
573 if cluster_dir is not None:
574 self.program_args.extend(['--cluster-dir', cluster_dir])
575 if profile is not None:
576 self.program_args.extend(['-p', profile])
577 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
578 return super(PBSEngineSetLauncher, self).start(n)
579
580
581 class SSHEngineSetLauncher(BaseLauncher):
582 pass
583
584
585
General Comments 0
You need to be logged in to leave comments. Login now