##// END OF EJS Templates
force /bin/sh in default SGE/PBS/LFS templates
Justin Riley -
Show More
@@ -1,1001 +1,1003 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import stat
21 import stat
22 import tempfile
22 import tempfile
23 pjoin = os.path.join
23 pjoin = os.path.join
24
24
25 from twisted.internet import reactor, defer
25 from twisted.internet import reactor, defer
26 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.protocol import ProcessProtocol
27 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.error import ProcessDone, ProcessTerminated
28 from twisted.internet.utils import getProcessOutput
28 from twisted.internet.utils import getProcessOutput
29 from twisted.python import failure, log
29 from twisted.python import failure, log
30
30
31 from IPython.external import argparse
31 from IPython.external import argparse
32 from IPython.external import Itpl
32 from IPython.external import Itpl
33 from IPython.genutils import (
33 from IPython.genutils import (
34 get_ipython_dir,
34 get_ipython_dir,
35 get_log_dir,
35 get_log_dir,
36 get_security_dir,
36 get_security_dir,
37 num_cpus
37 num_cpus
38 )
38 )
39 from IPython.kernel.fcutil import have_crypto
39 from IPython.kernel.fcutil import have_crypto
40
40
41 # Create various ipython directories if they don't exist.
41 # Create various ipython directories if they don't exist.
42 # This must be done before IPython.kernel.config is imported.
42 # This must be done before IPython.kernel.config is imported.
43 from IPython.iplib import user_setup
43 from IPython.iplib import user_setup
44 if os.name == 'posix':
44 if os.name == 'posix':
45 rc_suffix = ''
45 rc_suffix = ''
46 else:
46 else:
47 rc_suffix = '.ini'
47 rc_suffix = '.ini'
48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
49 get_log_dir()
49 get_log_dir()
50 get_security_dir()
50 get_security_dir()
51
51
52 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.config import config_manager as kernel_config_manager
53 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.error import SecurityError, FileTimeoutError
54 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.fcutil import have_crypto
55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
56 from IPython.kernel.util import printer
56 from IPython.kernel.util import printer
57
57
58 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
59 # General process handling code
59 # General process handling code
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61
61
62
62
63 class ProcessStateError(Exception):
63 class ProcessStateError(Exception):
64 pass
64 pass
65
65
66 class UnknownStatus(Exception):
66 class UnknownStatus(Exception):
67 pass
67 pass
68
68
69 class LauncherProcessProtocol(ProcessProtocol):
69 class LauncherProcessProtocol(ProcessProtocol):
70 """
70 """
71 A ProcessProtocol to go with the ProcessLauncher.
71 A ProcessProtocol to go with the ProcessLauncher.
72 """
72 """
73 def __init__(self, process_launcher):
73 def __init__(self, process_launcher):
74 self.process_launcher = process_launcher
74 self.process_launcher = process_launcher
75
75
76 def connectionMade(self):
76 def connectionMade(self):
77 self.process_launcher.fire_start_deferred(self.transport.pid)
77 self.process_launcher.fire_start_deferred(self.transport.pid)
78
78
79 def processEnded(self, status):
79 def processEnded(self, status):
80 value = status.value
80 value = status.value
81 if isinstance(value, ProcessDone):
81 if isinstance(value, ProcessDone):
82 self.process_launcher.fire_stop_deferred(0)
82 self.process_launcher.fire_stop_deferred(0)
83 elif isinstance(value, ProcessTerminated):
83 elif isinstance(value, ProcessTerminated):
84 self.process_launcher.fire_stop_deferred(
84 self.process_launcher.fire_stop_deferred(
85 {'exit_code':value.exitCode,
85 {'exit_code':value.exitCode,
86 'signal':value.signal,
86 'signal':value.signal,
87 'status':value.status
87 'status':value.status
88 }
88 }
89 )
89 )
90 else:
90 else:
91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
92
92
93 def outReceived(self, data):
93 def outReceived(self, data):
94 log.msg(data)
94 log.msg(data)
95
95
96 def errReceived(self, data):
96 def errReceived(self, data):
97 log.err(data)
97 log.err(data)
98
98
99 class ProcessLauncher(object):
99 class ProcessLauncher(object):
100 """
100 """
101 Start and stop an external process in an asynchronous manner.
101 Start and stop an external process in an asynchronous manner.
102
102
103 Currently this uses deferreds to notify other parties of process state
103 Currently this uses deferreds to notify other parties of process state
104 changes. This is an awkward design and should be moved to using
104 changes. This is an awkward design and should be moved to using
105 a formal NotificationCenter.
105 a formal NotificationCenter.
106 """
106 """
107 def __init__(self, cmd_and_args):
107 def __init__(self, cmd_and_args):
108 self.cmd = cmd_and_args[0]
108 self.cmd = cmd_and_args[0]
109 self.args = cmd_and_args
109 self.args = cmd_and_args
110 self._reset()
110 self._reset()
111
111
112 def _reset(self):
112 def _reset(self):
113 self.process_protocol = None
113 self.process_protocol = None
114 self.pid = None
114 self.pid = None
115 self.start_deferred = None
115 self.start_deferred = None
116 self.stop_deferreds = []
116 self.stop_deferreds = []
117 self.state = 'before' # before, running, or after
117 self.state = 'before' # before, running, or after
118
118
119 @property
119 @property
120 def running(self):
120 def running(self):
121 if self.state == 'running':
121 if self.state == 'running':
122 return True
122 return True
123 else:
123 else:
124 return False
124 return False
125
125
126 def fire_start_deferred(self, pid):
126 def fire_start_deferred(self, pid):
127 self.pid = pid
127 self.pid = pid
128 self.state = 'running'
128 self.state = 'running'
129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
130 self.start_deferred.callback(pid)
130 self.start_deferred.callback(pid)
131
131
132 def start(self):
132 def start(self):
133 if self.state == 'before':
133 if self.state == 'before':
134 self.process_protocol = LauncherProcessProtocol(self)
134 self.process_protocol = LauncherProcessProtocol(self)
135 self.start_deferred = defer.Deferred()
135 self.start_deferred = defer.Deferred()
136 self.process_transport = reactor.spawnProcess(
136 self.process_transport = reactor.spawnProcess(
137 self.process_protocol,
137 self.process_protocol,
138 self.cmd,
138 self.cmd,
139 self.args,
139 self.args,
140 env=os.environ
140 env=os.environ
141 )
141 )
142 return self.start_deferred
142 return self.start_deferred
143 else:
143 else:
144 s = 'the process has already been started and has state: %r' % \
144 s = 'the process has already been started and has state: %r' % \
145 self.state
145 self.state
146 return defer.fail(ProcessStateError(s))
146 return defer.fail(ProcessStateError(s))
147
147
148 def get_stop_deferred(self):
148 def get_stop_deferred(self):
149 if self.state == 'running' or self.state == 'before':
149 if self.state == 'running' or self.state == 'before':
150 d = defer.Deferred()
150 d = defer.Deferred()
151 self.stop_deferreds.append(d)
151 self.stop_deferreds.append(d)
152 return d
152 return d
153 else:
153 else:
154 s = 'this process is already complete'
154 s = 'this process is already complete'
155 return defer.fail(ProcessStateError(s))
155 return defer.fail(ProcessStateError(s))
156
156
157 def fire_stop_deferred(self, exit_code):
157 def fire_stop_deferred(self, exit_code):
158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
159 self.state = 'after'
159 self.state = 'after'
160 for d in self.stop_deferreds:
160 for d in self.stop_deferreds:
161 d.callback(exit_code)
161 d.callback(exit_code)
162
162
163 def signal(self, sig):
163 def signal(self, sig):
164 """
164 """
165 Send a signal to the process.
165 Send a signal to the process.
166
166
167 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 The argument sig can be ('KILL','INT', etc.) or any signal number.
168 """
168 """
169 if self.state == 'running':
169 if self.state == 'running':
170 self.process_transport.signalProcess(sig)
170 self.process_transport.signalProcess(sig)
171
171
172 # def __del__(self):
172 # def __del__(self):
173 # self.signal('KILL')
173 # self.signal('KILL')
174
174
175 def interrupt_then_kill(self, delay=1.0):
175 def interrupt_then_kill(self, delay=1.0):
176 self.signal('INT')
176 self.signal('INT')
177 reactor.callLater(delay, self.signal, 'KILL')
177 reactor.callLater(delay, self.signal, 'KILL')
178
178
179
179
180 #-----------------------------------------------------------------------------
180 #-----------------------------------------------------------------------------
181 # Code for launching controller and engines
181 # Code for launching controller and engines
182 #-----------------------------------------------------------------------------
182 #-----------------------------------------------------------------------------
183
183
184
184
185 class ControllerLauncher(ProcessLauncher):
185 class ControllerLauncher(ProcessLauncher):
186
186
187 def __init__(self, extra_args=None):
187 def __init__(self, extra_args=None):
188 if sys.platform == 'win32':
188 if sys.platform == 'win32':
189 # This logic is needed because the ipcontroller script doesn't
189 # This logic is needed because the ipcontroller script doesn't
190 # always get installed in the same way or in the same location.
190 # always get installed in the same way or in the same location.
191 from IPython.kernel.scripts import ipcontroller
191 from IPython.kernel.scripts import ipcontroller
192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
193 # The -u option here turns on unbuffered output, which is required
193 # The -u option here turns on unbuffered output, which is required
194 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # on Win32 to prevent wierd conflict and problems with Twisted.
195 # Also, use sys.executable to make sure we are picking up the
195 # Also, use sys.executable to make sure we are picking up the
196 # right python exe.
196 # right python exe.
197 args = [sys.executable, '-u', script_location]
197 args = [sys.executable, '-u', script_location]
198 else:
198 else:
199 args = ['ipcontroller']
199 args = ['ipcontroller']
200 self.extra_args = extra_args
200 self.extra_args = extra_args
201 if extra_args is not None:
201 if extra_args is not None:
202 args.extend(extra_args)
202 args.extend(extra_args)
203
203
204 ProcessLauncher.__init__(self, args)
204 ProcessLauncher.__init__(self, args)
205
205
206
206
207 class EngineLauncher(ProcessLauncher):
207 class EngineLauncher(ProcessLauncher):
208
208
209 def __init__(self, extra_args=None):
209 def __init__(self, extra_args=None):
210 if sys.platform == 'win32':
210 if sys.platform == 'win32':
211 # This logic is needed because the ipcontroller script doesn't
211 # This logic is needed because the ipcontroller script doesn't
212 # always get installed in the same way or in the same location.
212 # always get installed in the same way or in the same location.
213 from IPython.kernel.scripts import ipengine
213 from IPython.kernel.scripts import ipengine
214 script_location = ipengine.__file__.replace('.pyc', '.py')
214 script_location = ipengine.__file__.replace('.pyc', '.py')
215 # The -u option here turns on unbuffered output, which is required
215 # The -u option here turns on unbuffered output, which is required
216 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # on Win32 to prevent wierd conflict and problems with Twisted.
217 # Also, use sys.executable to make sure we are picking up the
217 # Also, use sys.executable to make sure we are picking up the
218 # right python exe.
218 # right python exe.
219 args = [sys.executable, '-u', script_location]
219 args = [sys.executable, '-u', script_location]
220 else:
220 else:
221 args = ['ipengine']
221 args = ['ipengine']
222 self.extra_args = extra_args
222 self.extra_args = extra_args
223 if extra_args is not None:
223 if extra_args is not None:
224 args.extend(extra_args)
224 args.extend(extra_args)
225
225
226 ProcessLauncher.__init__(self, args)
226 ProcessLauncher.__init__(self, args)
227
227
228
228
229 class LocalEngineSet(object):
229 class LocalEngineSet(object):
230
230
231 def __init__(self, extra_args=None):
231 def __init__(self, extra_args=None):
232 self.extra_args = extra_args
232 self.extra_args = extra_args
233 self.launchers = []
233 self.launchers = []
234
234
235 def start(self, n):
235 def start(self, n):
236 dlist = []
236 dlist = []
237 for i in range(n):
237 for i in range(n):
238 print "starting engine:", i
238 print "starting engine:", i
239 el = EngineLauncher(extra_args=self.extra_args)
239 el = EngineLauncher(extra_args=self.extra_args)
240 d = el.start()
240 d = el.start()
241 self.launchers.append(el)
241 self.launchers.append(el)
242 dlist.append(d)
242 dlist.append(d)
243 dfinal = gatherBoth(dlist, consumeErrors=True)
243 dfinal = gatherBoth(dlist, consumeErrors=True)
244 dfinal.addCallback(self._handle_start)
244 dfinal.addCallback(self._handle_start)
245 return dfinal
245 return dfinal
246
246
247 def _handle_start(self, r):
247 def _handle_start(self, r):
248 log.msg('Engines started with pids: %r' % r)
248 log.msg('Engines started with pids: %r' % r)
249 return r
249 return r
250
250
251 def _handle_stop(self, r):
251 def _handle_stop(self, r):
252 log.msg('Engines received signal: %r' % r)
252 log.msg('Engines received signal: %r' % r)
253 return r
253 return r
254
254
255 def signal(self, sig):
255 def signal(self, sig):
256 dlist = []
256 dlist = []
257 for el in self.launchers:
257 for el in self.launchers:
258 d = el.get_stop_deferred()
258 d = el.get_stop_deferred()
259 dlist.append(d)
259 dlist.append(d)
260 el.signal(sig)
260 el.signal(sig)
261 dfinal = gatherBoth(dlist, consumeErrors=True)
261 dfinal = gatherBoth(dlist, consumeErrors=True)
262 dfinal.addCallback(self._handle_stop)
262 dfinal.addCallback(self._handle_stop)
263 return dfinal
263 return dfinal
264
264
265 def interrupt_then_kill(self, delay=1.0):
265 def interrupt_then_kill(self, delay=1.0):
266 dlist = []
266 dlist = []
267 for el in self.launchers:
267 for el in self.launchers:
268 d = el.get_stop_deferred()
268 d = el.get_stop_deferred()
269 dlist.append(d)
269 dlist.append(d)
270 el.interrupt_then_kill(delay)
270 el.interrupt_then_kill(delay)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
272 dfinal.addCallback(self._handle_stop)
272 dfinal.addCallback(self._handle_stop)
273 return dfinal
273 return dfinal
274
274
275 class BatchEngineSet(object):
275 class BatchEngineSet(object):
276
276
277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
278 name = ''
278 name = ''
279 submit_command = ''
279 submit_command = ''
280 delete_command = ''
280 delete_command = ''
281 job_id_regexp = ''
281 job_id_regexp = ''
282 job_array_regexp = ''
282 job_array_regexp = ''
283 job_array_template = ''
283 job_array_template = ''
284 queue_regexp = ''
284 queue_regexp = ''
285 queue_template = ''
285 queue_template = ''
286 default_template = ''
286 default_template = ''
287
287
288 def __init__(self, template_file, queue, **kwargs):
288 def __init__(self, template_file, queue, **kwargs):
289 self.template_file = template_file
289 self.template_file = template_file
290 self.queue = queue
290 self.queue = queue
291
291
292 def parse_job_id(self, output):
292 def parse_job_id(self, output):
293 m = re.search(self.job_id_regexp, output)
293 m = re.search(self.job_id_regexp, output)
294 if m is not None:
294 if m is not None:
295 job_id = m.group()
295 job_id = m.group()
296 else:
296 else:
297 raise Exception("job id couldn't be determined: %s" % output)
297 raise Exception("job id couldn't be determined: %s" % output)
298 self.job_id = job_id
298 self.job_id = job_id
299 log.msg('Job started with job id: %r' % job_id)
299 log.msg('Job started with job id: %r' % job_id)
300 return job_id
300 return job_id
301
301
302 def handle_error(self, f):
302 def handle_error(self, f):
303 f.printTraceback()
303 f.printTraceback()
304 f.raiseException()
304 f.raiseException()
305
305
306 def start(self, n):
306 def start(self, n):
307 log.msg("starting %d engines" % n)
307 log.msg("starting %d engines" % n)
308 self._temp_file = tempfile.NamedTemporaryFile()
308 self._temp_file = tempfile.NamedTemporaryFile()
309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
309 if self.template_file:
310 if self.template_file:
310 log.msg("Using %s script %s" % (self.name, self.template_file))
311 log.msg("Using %s script %s" % (self.name, self.template_file))
311 contents = open(self.template_file, 'r').read()
312 contents = open(self.template_file, 'r').read()
312 new_script = contents
313 new_script = contents
313 regex = re.compile(self.job_array_regexp)
314 regex = re.compile(self.job_array_regexp)
314 if not regex.search(contents):
315 if not regex.search(contents):
315 log.msg("adding job array settings to %s script" % self.name)
316 log.msg("adding job array settings to %s script" % self.name)
316 new_script = self.job_array_template % n +'\n' + new_script
317 new_script = self.job_array_template % n +'\n' + new_script
317 print self.queue_regexp
318 print self.queue_regexp
318 regex = re.compile(self.queue_regexp)
319 regex = re.compile(self.queue_regexp)
319 print regex.search(contents)
320 print regex.search(contents)
320 if self.queue and not regex.search(contents):
321 if self.queue and not regex.search(contents):
321 log.msg("adding queue settings to %s script" % self.name)
322 log.msg("adding queue settings to %s script" % self.name)
322 new_script = self.queue_template % self.queue + '\n' + new_script
323 new_script = self.queue_template % self.queue + '\n' + new_script
323 if new_script != contents:
324 if new_script != contents:
324 self._temp_file.write(new_script)
325 self._temp_file.write(new_script)
325 self.template_file = self._temp_file.name
326 self.template_file = self._temp_file.name
326 else:
327 else:
327 default_script = self.default_template % n
328 default_script = self.default_template % n
328 if self.queue:
329 if self.queue:
329 default_script = self.queue_template % self.queue + \
330 default_script = self.queue_template % self.queue + \
330 '\n' + default_script
331 '\n' + default_script
331 log.msg("using default ipengine %s script: \n%s" %
332 log.msg("using default ipengine %s script: \n%s" %
332 (self.name, default_script))
333 (self.name, default_script))
333 self._temp_file.file.write(default_script)
334 self._temp_file.file.write(default_script)
334 self.template_file = self._temp_file.name
335 self.template_file = self._temp_file.name
335 self._temp_file.file.flush()
336 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
337 self._temp_file.file.close()
336 self._temp_file.file.close()
338 d = getProcessOutput(self.submit_command,
337 d = getProcessOutput(self.submit_command,
339 [self.template_file],
338 [self.template_file],
340 env=os.environ)
339 env=os.environ)
341 d.addCallback(self.parse_job_id)
340 d.addCallback(self.parse_job_id)
342 d.addErrback(self.handle_error)
341 d.addErrback(self.handle_error)
343 return d
342 return d
344
343
345 def kill(self):
344 def kill(self):
346 d = getProcessOutput(self.delete_command,
345 d = getProcessOutput(self.delete_command,
347 [self.job_id],env=os.environ)
346 [self.job_id],env=os.environ)
348 return d
347 return d
349
348
350 class PBSEngineSet(BatchEngineSet):
349 class PBSEngineSet(BatchEngineSet):
351
350
352 name = 'PBS'
351 name = 'PBS'
353 submit_command = 'qsub'
352 submit_command = 'qsub'
354 delete_command = 'qdel'
353 delete_command = 'qdel'
355 job_id_regexp = '\d+'
354 job_id_regexp = '\d+'
356 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
357 job_array_template = '#PBS -t 1-%d'
356 job_array_template = '#PBS -t 1-%d'
358 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
359 queue_template = '#PBS -q %s'
358 queue_template = '#PBS -q %s'
360 default_template="""#PBS -V
359 default_template="""#!/bin/sh
360 #PBS -V
361 #PBS -t 1-%d
361 #PBS -t 1-%d
362 #PBS -N ipengine
362 #PBS -N ipengine
363 eid=$(($PBS_ARRAYID - 1))
363 eid=$(($PBS_ARRAYID - 1))
364 ipengine --logfile=ipengine${eid}.log
364 ipengine --logfile=ipengine${eid}.log
365 """
365 """
366
366
367 class SGEEngineSet(PBSEngineSet):
367 class SGEEngineSet(PBSEngineSet):
368
368
369 name = 'SGE'
369 name = 'SGE'
370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
371 job_array_template = '#$ -t 1-%d'
371 job_array_template = '#$ -t 1-%d'
372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
373 queue_template = '#$ -q %s'
373 queue_template = '#$ -q %s'
374 default_template="""#$ -V
374 default_template="""#$ -V
375 #$ -S /bin/sh
375 #$ -t 1-%d
376 #$ -t 1-%d
376 #$ -N ipengine
377 #$ -N ipengine
377 eid=$(($SGE_TASK_ID - 1))
378 eid=$(($SGE_TASK_ID - 1))
378 ipengine --logfile=ipengine${eid}.log
379 ipengine --logfile=ipengine${eid}.log
379 """
380 """
380
381
381 class LSFEngineSet(PBSEngineSet):
382 class LSFEngineSet(PBSEngineSet):
382
383
383 name = 'LSF'
384 name = 'LSF'
384 submit_command = 'bsub'
385 submit_command = 'bsub'
385 delete_command = 'bkill'
386 delete_command = 'bkill'
386 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
387 job_array_template = '#BSUB -J ipengine[1-%d]'
388 job_array_template = '#BSUB -J ipengine[1-%d]'
388 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
389 queue_template = '#BSUB -q %s'
390 queue_template = '#BSUB -q %s'
390 default_template="""#BSUB -J ipengine[1-%d]
391 default_template="""#!/bin/sh
392 #BSUB -J ipengine[1-%d]
391 eid=$(($LSB_JOBINDEX - 1))
393 eid=$(($LSB_JOBINDEX - 1))
392 ipengine --logfile=ipengine${eid}.log
394 ipengine --logfile=ipengine${eid}.log
393 """
395 """
394
396
395 sshx_template="""#!/bin/sh
397 sshx_template="""#!/bin/sh
396 "$@" &> /dev/null &
398 "$@" &> /dev/null &
397 echo $!
399 echo $!
398 """
400 """
399
401
400 engine_killer_template="""#!/bin/sh
402 engine_killer_template="""#!/bin/sh
401 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
403 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
402 """
404 """
403
405
404 class SSHEngineSet(object):
406 class SSHEngineSet(object):
405 sshx_template=sshx_template
407 sshx_template=sshx_template
406 engine_killer_template=engine_killer_template
408 engine_killer_template=engine_killer_template
407
409
408 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
410 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
409 """Start a controller on localhost and engines using ssh.
411 """Start a controller on localhost and engines using ssh.
410
412
411 The engine_hosts argument is a dict with hostnames as keys and
413 The engine_hosts argument is a dict with hostnames as keys and
412 the number of engine (int) as values. sshx is the name of a local
414 the number of engine (int) as values. sshx is the name of a local
413 file that will be used to run remote commands. This file is used
415 file that will be used to run remote commands. This file is used
414 to setup the environment properly.
416 to setup the environment properly.
415 """
417 """
416
418
417 self.temp_dir = tempfile.gettempdir()
419 self.temp_dir = tempfile.gettempdir()
418 if sshx is not None:
420 if sshx is not None:
419 self.sshx = sshx
421 self.sshx = sshx
420 else:
422 else:
421 # Write the sshx.sh file locally from our template.
423 # Write the sshx.sh file locally from our template.
422 self.sshx = os.path.join(
424 self.sshx = os.path.join(
423 self.temp_dir,
425 self.temp_dir,
424 '%s-main-sshx.sh' % os.environ['USER']
426 '%s-main-sshx.sh' % os.environ['USER']
425 )
427 )
426 f = open(self.sshx, 'w')
428 f = open(self.sshx, 'w')
427 f.writelines(self.sshx_template)
429 f.writelines(self.sshx_template)
428 f.close()
430 f.close()
429 self.engine_command = ipengine
431 self.engine_command = ipengine
430 self.engine_hosts = engine_hosts
432 self.engine_hosts = engine_hosts
431 # Write the engine killer script file locally from our template.
433 # Write the engine killer script file locally from our template.
432 self.engine_killer = os.path.join(
434 self.engine_killer = os.path.join(
433 self.temp_dir,
435 self.temp_dir,
434 '%s-local-engine_killer.sh' % os.environ['USER']
436 '%s-local-engine_killer.sh' % os.environ['USER']
435 )
437 )
436 f = open(self.engine_killer, 'w')
438 f = open(self.engine_killer, 'w')
437 f.writelines(self.engine_killer_template)
439 f.writelines(self.engine_killer_template)
438 f.close()
440 f.close()
439
441
440 def start(self, send_furl=False):
442 def start(self, send_furl=False):
441 dlist = []
443 dlist = []
442 for host in self.engine_hosts.keys():
444 for host in self.engine_hosts.keys():
443 count = self.engine_hosts[host]
445 count = self.engine_hosts[host]
444 d = self._start(host, count, send_furl)
446 d = self._start(host, count, send_furl)
445 dlist.append(d)
447 dlist.append(d)
446 return gatherBoth(dlist, consumeErrors=True)
448 return gatherBoth(dlist, consumeErrors=True)
447
449
448 def _start(self, hostname, count=1, send_furl=False):
450 def _start(self, hostname, count=1, send_furl=False):
449 if send_furl:
451 if send_furl:
450 d = self._scp_furl(hostname)
452 d = self._scp_furl(hostname)
451 else:
453 else:
452 d = defer.succeed(None)
454 d = defer.succeed(None)
453 d.addCallback(lambda r: self._scp_sshx(hostname))
455 d.addCallback(lambda r: self._scp_sshx(hostname))
454 d.addCallback(lambda r: self._ssh_engine(hostname, count))
456 d.addCallback(lambda r: self._ssh_engine(hostname, count))
455 return d
457 return d
456
458
457 def _scp_furl(self, hostname):
459 def _scp_furl(self, hostname):
458 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
460 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
459 cmd_list = scp_cmd.split()
461 cmd_list = scp_cmd.split()
460 cmd_list[1] = os.path.expanduser(cmd_list[1])
462 cmd_list[1] = os.path.expanduser(cmd_list[1])
461 log.msg('Copying furl file: %s' % scp_cmd)
463 log.msg('Copying furl file: %s' % scp_cmd)
462 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
464 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
463 return d
465 return d
464
466
465 def _scp_sshx(self, hostname):
467 def _scp_sshx(self, hostname):
466 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
468 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
467 self.sshx, hostname,
469 self.sshx, hostname,
468 self.temp_dir, os.environ['USER']
470 self.temp_dir, os.environ['USER']
469 )
471 )
470 print
472 print
471 log.msg("Copying sshx: %s" % scp_cmd)
473 log.msg("Copying sshx: %s" % scp_cmd)
472 sshx_scp = scp_cmd.split()
474 sshx_scp = scp_cmd.split()
473 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
475 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
474 return d
476 return d
475
477
476 def _ssh_engine(self, hostname, count):
478 def _ssh_engine(self, hostname, count):
477 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
479 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
478 hostname, self.temp_dir,
480 hostname, self.temp_dir,
479 os.environ['USER'], self.engine_command
481 os.environ['USER'], self.engine_command
480 )
482 )
481 cmds = exec_engine.split()
483 cmds = exec_engine.split()
482 dlist = []
484 dlist = []
483 log.msg("about to start engines...")
485 log.msg("about to start engines...")
484 for i in range(count):
486 for i in range(count):
485 log.msg('Starting engines: %s' % exec_engine)
487 log.msg('Starting engines: %s' % exec_engine)
486 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
488 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
487 dlist.append(d)
489 dlist.append(d)
488 return gatherBoth(dlist, consumeErrors=True)
490 return gatherBoth(dlist, consumeErrors=True)
489
491
490 def kill(self):
492 def kill(self):
491 dlist = []
493 dlist = []
492 for host in self.engine_hosts.keys():
494 for host in self.engine_hosts.keys():
493 d = self._killall(host)
495 d = self._killall(host)
494 dlist.append(d)
496 dlist.append(d)
495 return gatherBoth(dlist, consumeErrors=True)
497 return gatherBoth(dlist, consumeErrors=True)
496
498
497 def _killall(self, hostname):
499 def _killall(self, hostname):
498 d = self._scp_engine_killer(hostname)
500 d = self._scp_engine_killer(hostname)
499 d.addCallback(lambda r: self._ssh_kill(hostname))
501 d.addCallback(lambda r: self._ssh_kill(hostname))
500 # d.addErrback(self._exec_err)
502 # d.addErrback(self._exec_err)
501 return d
503 return d
502
504
503 def _scp_engine_killer(self, hostname):
505 def _scp_engine_killer(self, hostname):
504 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
506 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
505 self.engine_killer,
507 self.engine_killer,
506 hostname,
508 hostname,
507 self.temp_dir,
509 self.temp_dir,
508 os.environ['USER']
510 os.environ['USER']
509 )
511 )
510 cmds = scp_cmd.split()
512 cmds = scp_cmd.split()
511 log.msg('Copying engine_killer: %s' % scp_cmd)
513 log.msg('Copying engine_killer: %s' % scp_cmd)
512 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
514 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
513 return d
515 return d
514
516
515 def _ssh_kill(self, hostname):
517 def _ssh_kill(self, hostname):
516 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
518 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
517 hostname,
519 hostname,
518 self.temp_dir,
520 self.temp_dir,
519 os.environ['USER']
521 os.environ['USER']
520 )
522 )
521 log.msg('Killing engine: %s' % kill_cmd)
523 log.msg('Killing engine: %s' % kill_cmd)
522 kill_cmd = kill_cmd.split()
524 kill_cmd = kill_cmd.split()
523 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
525 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
524 return d
526 return d
525
527
526 def _exec_err(self, r):
528 def _exec_err(self, r):
527 log.msg(r)
529 log.msg(r)
528
530
529 #-----------------------------------------------------------------------------
531 #-----------------------------------------------------------------------------
530 # Main functions for the different types of clusters
532 # Main functions for the different types of clusters
531 #-----------------------------------------------------------------------------
533 #-----------------------------------------------------------------------------
532
534
533 # TODO:
535 # TODO:
534 # The logic in these codes should be moved into classes like LocalCluster
536 # The logic in these codes should be moved into classes like LocalCluster
535 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
537 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
536 # The main functions should then just parse the command line arguments, create
538 # The main functions should then just parse the command line arguments, create
537 # the appropriate class and call a 'start' method.
539 # the appropriate class and call a 'start' method.
538
540
539
541
540 def check_security(args, cont_args):
542 def check_security(args, cont_args):
541 """Check to see if we should run with SSL support."""
543 """Check to see if we should run with SSL support."""
542 if (not args.x or not args.y) and not have_crypto:
544 if (not args.x or not args.y) and not have_crypto:
543 log.err("""
545 log.err("""
544 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
546 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
545 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
547 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
546 reactor.stop()
548 reactor.stop()
547 return False
549 return False
548 if args.x:
550 if args.x:
549 cont_args.append('-x')
551 cont_args.append('-x')
550 if args.y:
552 if args.y:
551 cont_args.append('-y')
553 cont_args.append('-y')
552 return True
554 return True
553
555
554
556
555 def check_reuse(args, cont_args):
557 def check_reuse(args, cont_args):
556 """Check to see if we should try to resuse FURL files."""
558 """Check to see if we should try to resuse FURL files."""
557 if args.r:
559 if args.r:
558 cont_args.append('-r')
560 cont_args.append('-r')
559 if args.client_port == 0 or args.engine_port == 0:
561 if args.client_port == 0 or args.engine_port == 0:
560 log.err("""
562 log.err("""
561 To reuse FURL files, you must also set the client and engine ports using
563 To reuse FURL files, you must also set the client and engine ports using
562 the --client-port and --engine-port options.""")
564 the --client-port and --engine-port options.""")
563 reactor.stop()
565 reactor.stop()
564 return False
566 return False
565 cont_args.append('--client-port=%i' % args.client_port)
567 cont_args.append('--client-port=%i' % args.client_port)
566 cont_args.append('--engine-port=%i' % args.engine_port)
568 cont_args.append('--engine-port=%i' % args.engine_port)
567 return True
569 return True
568
570
569
571
570 def _err_and_stop(f):
572 def _err_and_stop(f):
571 """Errback to log a failure and halt the reactor on a fatal error."""
573 """Errback to log a failure and halt the reactor on a fatal error."""
572 log.err(f)
574 log.err(f)
573 reactor.stop()
575 reactor.stop()
574
576
575
577
576 def _delay_start(cont_pid, start_engines, furl_file, reuse):
578 def _delay_start(cont_pid, start_engines, furl_file, reuse):
577 """Wait for controller to create FURL files and the start the engines."""
579 """Wait for controller to create FURL files and the start the engines."""
578 if not reuse:
580 if not reuse:
579 if os.path.isfile(furl_file):
581 if os.path.isfile(furl_file):
580 os.unlink(furl_file)
582 os.unlink(furl_file)
581 log.msg('Waiting for controller to finish starting...')
583 log.msg('Waiting for controller to finish starting...')
582 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
584 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
583 d.addCallback(lambda _: log.msg('Controller started'))
585 d.addCallback(lambda _: log.msg('Controller started'))
584 d.addCallback(lambda _: start_engines(cont_pid))
586 d.addCallback(lambda _: start_engines(cont_pid))
585 return d
587 return d
586
588
587
589
588 def main_local(args):
590 def main_local(args):
589 cont_args = []
591 cont_args = []
590 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
592 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
591
593
592 # Check security settings before proceeding
594 # Check security settings before proceeding
593 if not check_security(args, cont_args):
595 if not check_security(args, cont_args):
594 return
596 return
595
597
596 # See if we are reusing FURL files
598 # See if we are reusing FURL files
597 if not check_reuse(args, cont_args):
599 if not check_reuse(args, cont_args):
598 return
600 return
599
601
600 cl = ControllerLauncher(extra_args=cont_args)
602 cl = ControllerLauncher(extra_args=cont_args)
601 dstart = cl.start()
603 dstart = cl.start()
602 def start_engines(cont_pid):
604 def start_engines(cont_pid):
603 engine_args = []
605 engine_args = []
604 engine_args.append('--logfile=%s' % \
606 engine_args.append('--logfile=%s' % \
605 pjoin(args.logdir,'ipengine%s-' % cont_pid))
607 pjoin(args.logdir,'ipengine%s-' % cont_pid))
606 eset = LocalEngineSet(extra_args=engine_args)
608 eset = LocalEngineSet(extra_args=engine_args)
607 def shutdown(signum, frame):
609 def shutdown(signum, frame):
608 log.msg('Stopping local cluster')
610 log.msg('Stopping local cluster')
609 # We are still playing with the times here, but these seem
611 # We are still playing with the times here, but these seem
610 # to be reliable in allowing everything to exit cleanly.
612 # to be reliable in allowing everything to exit cleanly.
611 eset.interrupt_then_kill(0.5)
613 eset.interrupt_then_kill(0.5)
612 cl.interrupt_then_kill(0.5)
614 cl.interrupt_then_kill(0.5)
613 reactor.callLater(1.0, reactor.stop)
615 reactor.callLater(1.0, reactor.stop)
614 signal.signal(signal.SIGINT,shutdown)
616 signal.signal(signal.SIGINT,shutdown)
615 d = eset.start(args.n)
617 d = eset.start(args.n)
616 return d
618 return d
617 config = kernel_config_manager.get_config_obj()
619 config = kernel_config_manager.get_config_obj()
618 furl_file = config['controller']['engine_furl_file']
620 furl_file = config['controller']['engine_furl_file']
619 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
621 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
620 dstart.addErrback(_err_and_stop)
622 dstart.addErrback(_err_and_stop)
621
623
622
624
623 def main_mpi(args):
625 def main_mpi(args):
624 cont_args = []
626 cont_args = []
625 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
627 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
626
628
627 # Check security settings before proceeding
629 # Check security settings before proceeding
628 if not check_security(args, cont_args):
630 if not check_security(args, cont_args):
629 return
631 return
630
632
631 # See if we are reusing FURL files
633 # See if we are reusing FURL files
632 if not check_reuse(args, cont_args):
634 if not check_reuse(args, cont_args):
633 return
635 return
634
636
635 cl = ControllerLauncher(extra_args=cont_args)
637 cl = ControllerLauncher(extra_args=cont_args)
636 dstart = cl.start()
638 dstart = cl.start()
637 def start_engines(cont_pid):
639 def start_engines(cont_pid):
638 raw_args = [args.cmd]
640 raw_args = [args.cmd]
639 raw_args.extend(['-n',str(args.n)])
641 raw_args.extend(['-n',str(args.n)])
640 raw_args.append('ipengine')
642 raw_args.append('ipengine')
641 raw_args.append('-l')
643 raw_args.append('-l')
642 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
644 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
643 if args.mpi:
645 if args.mpi:
644 raw_args.append('--mpi=%s' % args.mpi)
646 raw_args.append('--mpi=%s' % args.mpi)
645 eset = ProcessLauncher(raw_args)
647 eset = ProcessLauncher(raw_args)
646 def shutdown(signum, frame):
648 def shutdown(signum, frame):
647 log.msg('Stopping local cluster')
649 log.msg('Stopping local cluster')
648 # We are still playing with the times here, but these seem
650 # We are still playing with the times here, but these seem
649 # to be reliable in allowing everything to exit cleanly.
651 # to be reliable in allowing everything to exit cleanly.
650 eset.interrupt_then_kill(1.0)
652 eset.interrupt_then_kill(1.0)
651 cl.interrupt_then_kill(1.0)
653 cl.interrupt_then_kill(1.0)
652 reactor.callLater(2.0, reactor.stop)
654 reactor.callLater(2.0, reactor.stop)
653 signal.signal(signal.SIGINT,shutdown)
655 signal.signal(signal.SIGINT,shutdown)
654 d = eset.start()
656 d = eset.start()
655 return d
657 return d
656 config = kernel_config_manager.get_config_obj()
658 config = kernel_config_manager.get_config_obj()
657 furl_file = config['controller']['engine_furl_file']
659 furl_file = config['controller']['engine_furl_file']
658 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
660 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
659 dstart.addErrback(_err_and_stop)
661 dstart.addErrback(_err_and_stop)
660
662
661
663
662 def main_pbs(args):
664 def main_pbs(args):
663 cont_args = []
665 cont_args = []
664 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
666 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
665
667
666 # Check security settings before proceeding
668 # Check security settings before proceeding
667 if not check_security(args, cont_args):
669 if not check_security(args, cont_args):
668 return
670 return
669
671
670 # See if we are reusing FURL files
672 # See if we are reusing FURL files
671 if not check_reuse(args, cont_args):
673 if not check_reuse(args, cont_args):
672 return
674 return
673
675
674 if args.pbsscript and not os.path.isfile(args.pbsscript):
676 if args.pbsscript and not os.path.isfile(args.pbsscript):
675 log.err('PBS script does not exist: %s' % args.pbsscript)
677 log.err('PBS script does not exist: %s' % args.pbsscript)
676 return
678 return
677
679
678 cl = ControllerLauncher(extra_args=cont_args)
680 cl = ControllerLauncher(extra_args=cont_args)
679 dstart = cl.start()
681 dstart = cl.start()
680 def start_engines(r):
682 def start_engines(r):
681 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
683 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
682 def shutdown(signum, frame):
684 def shutdown(signum, frame):
683 log.msg('Stopping PBS cluster')
685 log.msg('Stopping PBS cluster')
684 d = pbs_set.kill()
686 d = pbs_set.kill()
685 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
687 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
686 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
688 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
687 signal.signal(signal.SIGINT,shutdown)
689 signal.signal(signal.SIGINT,shutdown)
688 d = pbs_set.start(args.n)
690 d = pbs_set.start(args.n)
689 return d
691 return d
690 config = kernel_config_manager.get_config_obj()
692 config = kernel_config_manager.get_config_obj()
691 furl_file = config['controller']['engine_furl_file']
693 furl_file = config['controller']['engine_furl_file']
692 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
694 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
693 dstart.addErrback(_err_and_stop)
695 dstart.addErrback(_err_and_stop)
694
696
695 def main_sge(args):
697 def main_sge(args):
696 cont_args = []
698 cont_args = []
697 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
699 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
698
700
699 # Check security settings before proceeding
701 # Check security settings before proceeding
700 if not check_security(args, cont_args):
702 if not check_security(args, cont_args):
701 return
703 return
702
704
703 # See if we are reusing FURL files
705 # See if we are reusing FURL files
704 if not check_reuse(args, cont_args):
706 if not check_reuse(args, cont_args):
705 return
707 return
706
708
707 if args.sgescript and not os.path.isfile(args.sgescript):
709 if args.sgescript and not os.path.isfile(args.sgescript):
708 log.err('SGE script does not exist: %s' % args.sgescript)
710 log.err('SGE script does not exist: %s' % args.sgescript)
709 return
711 return
710
712
711 cl = ControllerLauncher(extra_args=cont_args)
713 cl = ControllerLauncher(extra_args=cont_args)
712 dstart = cl.start()
714 dstart = cl.start()
713 def start_engines(r):
715 def start_engines(r):
714 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
716 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
715 def shutdown(signum, frame):
717 def shutdown(signum, frame):
716 log.msg('Stopping sge cluster')
718 log.msg('Stopping sge cluster')
717 d = sge_set.kill()
719 d = sge_set.kill()
718 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
720 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
719 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
721 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
720 signal.signal(signal.SIGINT,shutdown)
722 signal.signal(signal.SIGINT,shutdown)
721 d = sge_set.start(args.n)
723 d = sge_set.start(args.n)
722 return d
724 return d
723 config = kernel_config_manager.get_config_obj()
725 config = kernel_config_manager.get_config_obj()
724 furl_file = config['controller']['engine_furl_file']
726 furl_file = config['controller']['engine_furl_file']
725 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
727 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
726 dstart.addErrback(_err_and_stop)
728 dstart.addErrback(_err_and_stop)
727
729
728 def main_lsf(args):
730 def main_lsf(args):
729 cont_args = []
731 cont_args = []
730 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
732 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
731
733
732 # Check security settings before proceeding
734 # Check security settings before proceeding
733 if not check_security(args, cont_args):
735 if not check_security(args, cont_args):
734 return
736 return
735
737
736 # See if we are reusing FURL files
738 # See if we are reusing FURL files
737 if not check_reuse(args, cont_args):
739 if not check_reuse(args, cont_args):
738 return
740 return
739
741
740 if args.lsfscript and not os.path.isfile(args.lsfscript):
742 if args.lsfscript and not os.path.isfile(args.lsfscript):
741 log.err('LSF script does not exist: %s' % args.lsfscript)
743 log.err('LSF script does not exist: %s' % args.lsfscript)
742 return
744 return
743
745
744 cl = ControllerLauncher(extra_args=cont_args)
746 cl = ControllerLauncher(extra_args=cont_args)
745 dstart = cl.start()
747 dstart = cl.start()
746 def start_engines(r):
748 def start_engines(r):
747 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
749 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
748 def shutdown(signum, frame):
750 def shutdown(signum, frame):
749 log.msg('Stopping LSF cluster')
751 log.msg('Stopping LSF cluster')
750 d = lsf_set.kill()
752 d = lsf_set.kill()
751 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
753 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
752 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
754 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
753 signal.signal(signal.SIGINT,shutdown)
755 signal.signal(signal.SIGINT,shutdown)
754 d = lsf_set.start(args.n)
756 d = lsf_set.start(args.n)
755 return d
757 return d
756 config = kernel_config_manager.get_config_obj()
758 config = kernel_config_manager.get_config_obj()
757 furl_file = config['controller']['engine_furl_file']
759 furl_file = config['controller']['engine_furl_file']
758 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
760 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
759 dstart.addErrback(_err_and_stop)
761 dstart.addErrback(_err_and_stop)
760
762
761
763
762 def main_ssh(args):
764 def main_ssh(args):
763 """Start a controller on localhost and engines using ssh.
765 """Start a controller on localhost and engines using ssh.
764
766
765 Your clusterfile should look like::
767 Your clusterfile should look like::
766
768
767 send_furl = False # True, if you want
769 send_furl = False # True, if you want
768 engines = {
770 engines = {
769 'engine_host1' : engine_count,
771 'engine_host1' : engine_count,
770 'engine_host2' : engine_count2
772 'engine_host2' : engine_count2
771 }
773 }
772 """
774 """
773 clusterfile = {}
775 clusterfile = {}
774 execfile(args.clusterfile, clusterfile)
776 execfile(args.clusterfile, clusterfile)
775 if not clusterfile.has_key('send_furl'):
777 if not clusterfile.has_key('send_furl'):
776 clusterfile['send_furl'] = False
778 clusterfile['send_furl'] = False
777
779
778 cont_args = []
780 cont_args = []
779 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
781 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
780
782
781 # Check security settings before proceeding
783 # Check security settings before proceeding
782 if not check_security(args, cont_args):
784 if not check_security(args, cont_args):
783 return
785 return
784
786
785 # See if we are reusing FURL files
787 # See if we are reusing FURL files
786 if not check_reuse(args, cont_args):
788 if not check_reuse(args, cont_args):
787 return
789 return
788
790
789 cl = ControllerLauncher(extra_args=cont_args)
791 cl = ControllerLauncher(extra_args=cont_args)
790 dstart = cl.start()
792 dstart = cl.start()
791 def start_engines(cont_pid):
793 def start_engines(cont_pid):
792 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
794 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
793 def shutdown(signum, frame):
795 def shutdown(signum, frame):
794 d = ssh_set.kill()
796 d = ssh_set.kill()
795 cl.interrupt_then_kill(1.0)
797 cl.interrupt_then_kill(1.0)
796 reactor.callLater(2.0, reactor.stop)
798 reactor.callLater(2.0, reactor.stop)
797 signal.signal(signal.SIGINT,shutdown)
799 signal.signal(signal.SIGINT,shutdown)
798 d = ssh_set.start(clusterfile['send_furl'])
800 d = ssh_set.start(clusterfile['send_furl'])
799 return d
801 return d
800 config = kernel_config_manager.get_config_obj()
802 config = kernel_config_manager.get_config_obj()
801 furl_file = config['controller']['engine_furl_file']
803 furl_file = config['controller']['engine_furl_file']
802 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
804 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
803 dstart.addErrback(_err_and_stop)
805 dstart.addErrback(_err_and_stop)
804
806
805
807
806 def get_args():
808 def get_args():
807 base_parser = argparse.ArgumentParser(add_help=False)
809 base_parser = argparse.ArgumentParser(add_help=False)
808 base_parser.add_argument(
810 base_parser.add_argument(
809 '-r',
811 '-r',
810 action='store_true',
812 action='store_true',
811 dest='r',
813 dest='r',
812 help='try to reuse FURL files. Use with --client-port and --engine-port'
814 help='try to reuse FURL files. Use with --client-port and --engine-port'
813 )
815 )
814 base_parser.add_argument(
816 base_parser.add_argument(
815 '--client-port',
817 '--client-port',
816 type=int,
818 type=int,
817 dest='client_port',
819 dest='client_port',
818 help='the port the controller will listen on for client connections',
820 help='the port the controller will listen on for client connections',
819 default=0
821 default=0
820 )
822 )
821 base_parser.add_argument(
823 base_parser.add_argument(
822 '--engine-port',
824 '--engine-port',
823 type=int,
825 type=int,
824 dest='engine_port',
826 dest='engine_port',
825 help='the port the controller will listen on for engine connections',
827 help='the port the controller will listen on for engine connections',
826 default=0
828 default=0
827 )
829 )
828 base_parser.add_argument(
830 base_parser.add_argument(
829 '-x',
831 '-x',
830 action='store_true',
832 action='store_true',
831 dest='x',
833 dest='x',
832 help='turn off client security'
834 help='turn off client security'
833 )
835 )
834 base_parser.add_argument(
836 base_parser.add_argument(
835 '-y',
837 '-y',
836 action='store_true',
838 action='store_true',
837 dest='y',
839 dest='y',
838 help='turn off engine security'
840 help='turn off engine security'
839 )
841 )
840 base_parser.add_argument(
842 base_parser.add_argument(
841 "--logdir",
843 "--logdir",
842 type=str,
844 type=str,
843 dest="logdir",
845 dest="logdir",
844 help="directory to put log files (default=$IPYTHONDIR/log)",
846 help="directory to put log files (default=$IPYTHONDIR/log)",
845 default=pjoin(get_ipython_dir(),'log')
847 default=pjoin(get_ipython_dir(),'log')
846 )
848 )
847 base_parser.add_argument(
849 base_parser.add_argument(
848 "-n",
850 "-n",
849 "--num",
851 "--num",
850 type=int,
852 type=int,
851 dest="n",
853 dest="n",
852 default=2,
854 default=2,
853 help="the number of engines to start"
855 help="the number of engines to start"
854 )
856 )
855
857
856 parser = argparse.ArgumentParser(
858 parser = argparse.ArgumentParser(
857 description='IPython cluster startup. This starts a controller and\
859 description='IPython cluster startup. This starts a controller and\
858 engines using various approaches. Use the IPYTHONDIR environment\
860 engines using various approaches. Use the IPYTHONDIR environment\
859 variable to change your IPython directory from the default of\
861 variable to change your IPython directory from the default of\
860 .ipython or _ipython. The log and security subdirectories of your\
862 .ipython or _ipython. The log and security subdirectories of your\
861 IPython directory will be used by this script for log files and\
863 IPython directory will be used by this script for log files and\
862 security files.'
864 security files.'
863 )
865 )
864 subparsers = parser.add_subparsers(
866 subparsers = parser.add_subparsers(
865 help='available cluster types. For help, do "ipcluster TYPE --help"')
867 help='available cluster types. For help, do "ipcluster TYPE --help"')
866
868
867 parser_local = subparsers.add_parser(
869 parser_local = subparsers.add_parser(
868 'local',
870 'local',
869 help='run a local cluster',
871 help='run a local cluster',
870 parents=[base_parser]
872 parents=[base_parser]
871 )
873 )
872 parser_local.set_defaults(func=main_local)
874 parser_local.set_defaults(func=main_local)
873
875
874 parser_mpirun = subparsers.add_parser(
876 parser_mpirun = subparsers.add_parser(
875 'mpirun',
877 'mpirun',
876 help='run a cluster using mpirun (mpiexec also works)',
878 help='run a cluster using mpirun (mpiexec also works)',
877 parents=[base_parser]
879 parents=[base_parser]
878 )
880 )
879 parser_mpirun.add_argument(
881 parser_mpirun.add_argument(
880 "--mpi",
882 "--mpi",
881 type=str,
883 type=str,
882 dest="mpi", # Don't put a default here to allow no MPI support
884 dest="mpi", # Don't put a default here to allow no MPI support
883 help="how to call MPI_Init (default=mpi4py)"
885 help="how to call MPI_Init (default=mpi4py)"
884 )
886 )
885 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
887 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
886
888
887 parser_mpiexec = subparsers.add_parser(
889 parser_mpiexec = subparsers.add_parser(
888 'mpiexec',
890 'mpiexec',
889 help='run a cluster using mpiexec (mpirun also works)',
891 help='run a cluster using mpiexec (mpirun also works)',
890 parents=[base_parser]
892 parents=[base_parser]
891 )
893 )
892 parser_mpiexec.add_argument(
894 parser_mpiexec.add_argument(
893 "--mpi",
895 "--mpi",
894 type=str,
896 type=str,
895 dest="mpi", # Don't put a default here to allow no MPI support
897 dest="mpi", # Don't put a default here to allow no MPI support
896 help="how to call MPI_Init (default=mpi4py)"
898 help="how to call MPI_Init (default=mpi4py)"
897 )
899 )
898 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
900 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
899
901
900 parser_pbs = subparsers.add_parser(
902 parser_pbs = subparsers.add_parser(
901 'pbs',
903 'pbs',
902 help='run a pbs cluster',
904 help='run a pbs cluster',
903 parents=[base_parser]
905 parents=[base_parser]
904 )
906 )
905 parser_pbs.add_argument(
907 parser_pbs.add_argument(
906 '-s',
908 '-s',
907 '--pbs-script',
909 '--pbs-script',
908 type=str,
910 type=str,
909 dest='pbsscript',
911 dest='pbsscript',
910 help='PBS script template',
912 help='PBS script template',
911 default=''
913 default=''
912 )
914 )
913 parser_pbs.add_argument(
915 parser_pbs.add_argument(
914 '-q',
916 '-q',
915 '--queue',
917 '--queue',
916 type=str,
918 type=str,
917 dest='pbsqueue',
919 dest='pbsqueue',
918 help='PBS queue to use when starting the engines',
920 help='PBS queue to use when starting the engines',
919 default=None,
921 default=None,
920 )
922 )
921 parser_pbs.set_defaults(func=main_pbs)
923 parser_pbs.set_defaults(func=main_pbs)
922
924
923 parser_sge = subparsers.add_parser(
925 parser_sge = subparsers.add_parser(
924 'sge',
926 'sge',
925 help='run an sge cluster',
927 help='run an sge cluster',
926 parents=[base_parser]
928 parents=[base_parser]
927 )
929 )
928 parser_sge.add_argument(
930 parser_sge.add_argument(
929 '-s',
931 '-s',
930 '--sge-script',
932 '--sge-script',
931 type=str,
933 type=str,
932 dest='sgescript',
934 dest='sgescript',
933 help='SGE script template',
935 help='SGE script template',
934 default='' # SGEEngineSet will create one if not specified
936 default='' # SGEEngineSet will create one if not specified
935 )
937 )
936 parser_sge.add_argument(
938 parser_sge.add_argument(
937 '-q',
939 '-q',
938 '--queue',
940 '--queue',
939 type=str,
941 type=str,
940 dest='sgequeue',
942 dest='sgequeue',
941 help='SGE queue to use when starting the engines',
943 help='SGE queue to use when starting the engines',
942 default=None,
944 default=None,
943 )
945 )
944 parser_sge.set_defaults(func=main_sge)
946 parser_sge.set_defaults(func=main_sge)
945
947
946 parser_lsf = subparsers.add_parser(
948 parser_lsf = subparsers.add_parser(
947 'lsf',
949 'lsf',
948 help='run an lsf cluster',
950 help='run an lsf cluster',
949 parents=[base_parser]
951 parents=[base_parser]
950 )
952 )
951
953
952 parser_lsf.add_argument(
954 parser_lsf.add_argument(
953 '-s',
955 '-s',
954 '--lsf-script',
956 '--lsf-script',
955 type=str,
957 type=str,
956 dest='lsfscript',
958 dest='lsfscript',
957 help='LSF script template',
959 help='LSF script template',
958 default='' # LSFEngineSet will create one if not specified
960 default='' # LSFEngineSet will create one if not specified
959 )
961 )
960
962
961 parser_lsf.add_argument(
963 parser_lsf.add_argument(
962 '-q',
964 '-q',
963 '--queue',
965 '--queue',
964 type=str,
966 type=str,
965 dest='lsfqueue',
967 dest='lsfqueue',
966 help='LSF queue to use when starting the engines',
968 help='LSF queue to use when starting the engines',
967 default=None,
969 default=None,
968 )
970 )
969 parser_lsf.set_defaults(func=main_lsf)
971 parser_lsf.set_defaults(func=main_lsf)
970
972
971 parser_ssh = subparsers.add_parser(
973 parser_ssh = subparsers.add_parser(
972 'ssh',
974 'ssh',
973 help='run a cluster using ssh, should have ssh-keys setup',
975 help='run a cluster using ssh, should have ssh-keys setup',
974 parents=[base_parser]
976 parents=[base_parser]
975 )
977 )
976 parser_ssh.add_argument(
978 parser_ssh.add_argument(
977 '--clusterfile',
979 '--clusterfile',
978 type=str,
980 type=str,
979 dest='clusterfile',
981 dest='clusterfile',
980 help='python file describing the cluster',
982 help='python file describing the cluster',
981 default='clusterfile.py',
983 default='clusterfile.py',
982 )
984 )
983 parser_ssh.add_argument(
985 parser_ssh.add_argument(
984 '--sshx',
986 '--sshx',
985 type=str,
987 type=str,
986 dest='sshx',
988 dest='sshx',
987 help='sshx launcher helper'
989 help='sshx launcher helper'
988 )
990 )
989 parser_ssh.set_defaults(func=main_ssh)
991 parser_ssh.set_defaults(func=main_ssh)
990
992
991 args = parser.parse_args()
993 args = parser.parse_args()
992 return args
994 return args
993
995
994 def main():
996 def main():
995 args = get_args()
997 args = get_args()
996 reactor.callWhenRunning(args.func, args)
998 reactor.callWhenRunning(args.func, args)
997 log.startLogging(sys.stdout)
999 log.startLogging(sys.stdout)
998 reactor.run()
1000 reactor.run()
999
1001
1000 if __name__ == '__main__':
1002 if __name__ == '__main__':
1001 main()
1003 main()
General Comments 0
You need to be logged in to leave comments. Login now