##// END OF EJS Templates
add bsub wrapper for LSFEngineSet...
Justin Riley -
Show More
@@ -1,1003 +1,1018 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import stat
21 import stat
22 import tempfile
22 import tempfile
23 pjoin = os.path.join
23 pjoin = os.path.join
24
24
25 from twisted.internet import reactor, defer
25 from twisted.internet import reactor, defer
26 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.protocol import ProcessProtocol
27 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.error import ProcessDone, ProcessTerminated
28 from twisted.internet.utils import getProcessOutput
28 from twisted.internet.utils import getProcessOutput
29 from twisted.python import failure, log
29 from twisted.python import failure, log
30
30
31 from IPython.external import argparse
31 from IPython.external import argparse
32 from IPython.external import Itpl
32 from IPython.external import Itpl
33 from IPython.genutils import (
33 from IPython.genutils import (
34 get_ipython_dir,
34 get_ipython_dir,
35 get_log_dir,
35 get_log_dir,
36 get_security_dir,
36 get_security_dir,
37 num_cpus
37 num_cpus
38 )
38 )
39 from IPython.kernel.fcutil import have_crypto
39 from IPython.kernel.fcutil import have_crypto
40
40
41 # Create various ipython directories if they don't exist.
41 # Create various ipython directories if they don't exist.
42 # This must be done before IPython.kernel.config is imported.
42 # This must be done before IPython.kernel.config is imported.
43 from IPython.iplib import user_setup
43 from IPython.iplib import user_setup
44 if os.name == 'posix':
44 if os.name == 'posix':
45 rc_suffix = ''
45 rc_suffix = ''
46 else:
46 else:
47 rc_suffix = '.ini'
47 rc_suffix = '.ini'
48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
49 get_log_dir()
49 get_log_dir()
50 get_security_dir()
50 get_security_dir()
51
51
52 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.config import config_manager as kernel_config_manager
53 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.error import SecurityError, FileTimeoutError
54 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.fcutil import have_crypto
55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
56 from IPython.kernel.util import printer
56 from IPython.kernel.util import printer
57
57
58 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
59 # General process handling code
59 # General process handling code
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61
61
62
62
63 class ProcessStateError(Exception):
63 class ProcessStateError(Exception):
64 pass
64 pass
65
65
66 class UnknownStatus(Exception):
66 class UnknownStatus(Exception):
67 pass
67 pass
68
68
69 class LauncherProcessProtocol(ProcessProtocol):
69 class LauncherProcessProtocol(ProcessProtocol):
70 """
70 """
71 A ProcessProtocol to go with the ProcessLauncher.
71 A ProcessProtocol to go with the ProcessLauncher.
72 """
72 """
73 def __init__(self, process_launcher):
73 def __init__(self, process_launcher):
74 self.process_launcher = process_launcher
74 self.process_launcher = process_launcher
75
75
76 def connectionMade(self):
76 def connectionMade(self):
77 self.process_launcher.fire_start_deferred(self.transport.pid)
77 self.process_launcher.fire_start_deferred(self.transport.pid)
78
78
79 def processEnded(self, status):
79 def processEnded(self, status):
80 value = status.value
80 value = status.value
81 if isinstance(value, ProcessDone):
81 if isinstance(value, ProcessDone):
82 self.process_launcher.fire_stop_deferred(0)
82 self.process_launcher.fire_stop_deferred(0)
83 elif isinstance(value, ProcessTerminated):
83 elif isinstance(value, ProcessTerminated):
84 self.process_launcher.fire_stop_deferred(
84 self.process_launcher.fire_stop_deferred(
85 {'exit_code':value.exitCode,
85 {'exit_code':value.exitCode,
86 'signal':value.signal,
86 'signal':value.signal,
87 'status':value.status
87 'status':value.status
88 }
88 }
89 )
89 )
90 else:
90 else:
91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
92
92
93 def outReceived(self, data):
93 def outReceived(self, data):
94 log.msg(data)
94 log.msg(data)
95
95
96 def errReceived(self, data):
96 def errReceived(self, data):
97 log.err(data)
97 log.err(data)
98
98
99 class ProcessLauncher(object):
99 class ProcessLauncher(object):
100 """
100 """
101 Start and stop an external process in an asynchronous manner.
101 Start and stop an external process in an asynchronous manner.
102
102
103 Currently this uses deferreds to notify other parties of process state
103 Currently this uses deferreds to notify other parties of process state
104 changes. This is an awkward design and should be moved to using
104 changes. This is an awkward design and should be moved to using
105 a formal NotificationCenter.
105 a formal NotificationCenter.
106 """
106 """
107 def __init__(self, cmd_and_args):
107 def __init__(self, cmd_and_args):
108 self.cmd = cmd_and_args[0]
108 self.cmd = cmd_and_args[0]
109 self.args = cmd_and_args
109 self.args = cmd_and_args
110 self._reset()
110 self._reset()
111
111
112 def _reset(self):
112 def _reset(self):
113 self.process_protocol = None
113 self.process_protocol = None
114 self.pid = None
114 self.pid = None
115 self.start_deferred = None
115 self.start_deferred = None
116 self.stop_deferreds = []
116 self.stop_deferreds = []
117 self.state = 'before' # before, running, or after
117 self.state = 'before' # before, running, or after
118
118
119 @property
119 @property
120 def running(self):
120 def running(self):
121 if self.state == 'running':
121 if self.state == 'running':
122 return True
122 return True
123 else:
123 else:
124 return False
124 return False
125
125
126 def fire_start_deferred(self, pid):
126 def fire_start_deferred(self, pid):
127 self.pid = pid
127 self.pid = pid
128 self.state = 'running'
128 self.state = 'running'
129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
130 self.start_deferred.callback(pid)
130 self.start_deferred.callback(pid)
131
131
132 def start(self):
132 def start(self):
133 if self.state == 'before':
133 if self.state == 'before':
134 self.process_protocol = LauncherProcessProtocol(self)
134 self.process_protocol = LauncherProcessProtocol(self)
135 self.start_deferred = defer.Deferred()
135 self.start_deferred = defer.Deferred()
136 self.process_transport = reactor.spawnProcess(
136 self.process_transport = reactor.spawnProcess(
137 self.process_protocol,
137 self.process_protocol,
138 self.cmd,
138 self.cmd,
139 self.args,
139 self.args,
140 env=os.environ
140 env=os.environ
141 )
141 )
142 return self.start_deferred
142 return self.start_deferred
143 else:
143 else:
144 s = 'the process has already been started and has state: %r' % \
144 s = 'the process has already been started and has state: %r' % \
145 self.state
145 self.state
146 return defer.fail(ProcessStateError(s))
146 return defer.fail(ProcessStateError(s))
147
147
148 def get_stop_deferred(self):
148 def get_stop_deferred(self):
149 if self.state == 'running' or self.state == 'before':
149 if self.state == 'running' or self.state == 'before':
150 d = defer.Deferred()
150 d = defer.Deferred()
151 self.stop_deferreds.append(d)
151 self.stop_deferreds.append(d)
152 return d
152 return d
153 else:
153 else:
154 s = 'this process is already complete'
154 s = 'this process is already complete'
155 return defer.fail(ProcessStateError(s))
155 return defer.fail(ProcessStateError(s))
156
156
157 def fire_stop_deferred(self, exit_code):
157 def fire_stop_deferred(self, exit_code):
158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
159 self.state = 'after'
159 self.state = 'after'
160 for d in self.stop_deferreds:
160 for d in self.stop_deferreds:
161 d.callback(exit_code)
161 d.callback(exit_code)
162
162
163 def signal(self, sig):
163 def signal(self, sig):
164 """
164 """
165 Send a signal to the process.
165 Send a signal to the process.
166
166
167 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 The argument sig can be ('KILL','INT', etc.) or any signal number.
168 """
168 """
169 if self.state == 'running':
169 if self.state == 'running':
170 self.process_transport.signalProcess(sig)
170 self.process_transport.signalProcess(sig)
171
171
172 # def __del__(self):
172 # def __del__(self):
173 # self.signal('KILL')
173 # self.signal('KILL')
174
174
175 def interrupt_then_kill(self, delay=1.0):
175 def interrupt_then_kill(self, delay=1.0):
176 self.signal('INT')
176 self.signal('INT')
177 reactor.callLater(delay, self.signal, 'KILL')
177 reactor.callLater(delay, self.signal, 'KILL')
178
178
179
179
180 #-----------------------------------------------------------------------------
180 #-----------------------------------------------------------------------------
181 # Code for launching controller and engines
181 # Code for launching controller and engines
182 #-----------------------------------------------------------------------------
182 #-----------------------------------------------------------------------------
183
183
184
184
185 class ControllerLauncher(ProcessLauncher):
185 class ControllerLauncher(ProcessLauncher):
186
186
187 def __init__(self, extra_args=None):
187 def __init__(self, extra_args=None):
188 if sys.platform == 'win32':
188 if sys.platform == 'win32':
189 # This logic is needed because the ipcontroller script doesn't
189 # This logic is needed because the ipcontroller script doesn't
190 # always get installed in the same way or in the same location.
190 # always get installed in the same way or in the same location.
191 from IPython.kernel.scripts import ipcontroller
191 from IPython.kernel.scripts import ipcontroller
192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
193 # The -u option here turns on unbuffered output, which is required
193 # The -u option here turns on unbuffered output, which is required
194 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # on Win32 to prevent wierd conflict and problems with Twisted.
195 # Also, use sys.executable to make sure we are picking up the
195 # Also, use sys.executable to make sure we are picking up the
196 # right python exe.
196 # right python exe.
197 args = [sys.executable, '-u', script_location]
197 args = [sys.executable, '-u', script_location]
198 else:
198 else:
199 args = ['ipcontroller']
199 args = ['ipcontroller']
200 self.extra_args = extra_args
200 self.extra_args = extra_args
201 if extra_args is not None:
201 if extra_args is not None:
202 args.extend(extra_args)
202 args.extend(extra_args)
203
203
204 ProcessLauncher.__init__(self, args)
204 ProcessLauncher.__init__(self, args)
205
205
206
206
207 class EngineLauncher(ProcessLauncher):
207 class EngineLauncher(ProcessLauncher):
208
208
209 def __init__(self, extra_args=None):
209 def __init__(self, extra_args=None):
210 if sys.platform == 'win32':
210 if sys.platform == 'win32':
211 # This logic is needed because the ipcontroller script doesn't
211 # This logic is needed because the ipcontroller script doesn't
212 # always get installed in the same way or in the same location.
212 # always get installed in the same way or in the same location.
213 from IPython.kernel.scripts import ipengine
213 from IPython.kernel.scripts import ipengine
214 script_location = ipengine.__file__.replace('.pyc', '.py')
214 script_location = ipengine.__file__.replace('.pyc', '.py')
215 # The -u option here turns on unbuffered output, which is required
215 # The -u option here turns on unbuffered output, which is required
216 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # on Win32 to prevent wierd conflict and problems with Twisted.
217 # Also, use sys.executable to make sure we are picking up the
217 # Also, use sys.executable to make sure we are picking up the
218 # right python exe.
218 # right python exe.
219 args = [sys.executable, '-u', script_location]
219 args = [sys.executable, '-u', script_location]
220 else:
220 else:
221 args = ['ipengine']
221 args = ['ipengine']
222 self.extra_args = extra_args
222 self.extra_args = extra_args
223 if extra_args is not None:
223 if extra_args is not None:
224 args.extend(extra_args)
224 args.extend(extra_args)
225
225
226 ProcessLauncher.__init__(self, args)
226 ProcessLauncher.__init__(self, args)
227
227
228
228
229 class LocalEngineSet(object):
229 class LocalEngineSet(object):
230
230
231 def __init__(self, extra_args=None):
231 def __init__(self, extra_args=None):
232 self.extra_args = extra_args
232 self.extra_args = extra_args
233 self.launchers = []
233 self.launchers = []
234
234
235 def start(self, n):
235 def start(self, n):
236 dlist = []
236 dlist = []
237 for i in range(n):
237 for i in range(n):
238 print "starting engine:", i
238 print "starting engine:", i
239 el = EngineLauncher(extra_args=self.extra_args)
239 el = EngineLauncher(extra_args=self.extra_args)
240 d = el.start()
240 d = el.start()
241 self.launchers.append(el)
241 self.launchers.append(el)
242 dlist.append(d)
242 dlist.append(d)
243 dfinal = gatherBoth(dlist, consumeErrors=True)
243 dfinal = gatherBoth(dlist, consumeErrors=True)
244 dfinal.addCallback(self._handle_start)
244 dfinal.addCallback(self._handle_start)
245 return dfinal
245 return dfinal
246
246
247 def _handle_start(self, r):
247 def _handle_start(self, r):
248 log.msg('Engines started with pids: %r' % r)
248 log.msg('Engines started with pids: %r' % r)
249 return r
249 return r
250
250
251 def _handle_stop(self, r):
251 def _handle_stop(self, r):
252 log.msg('Engines received signal: %r' % r)
252 log.msg('Engines received signal: %r' % r)
253 return r
253 return r
254
254
255 def signal(self, sig):
255 def signal(self, sig):
256 dlist = []
256 dlist = []
257 for el in self.launchers:
257 for el in self.launchers:
258 d = el.get_stop_deferred()
258 d = el.get_stop_deferred()
259 dlist.append(d)
259 dlist.append(d)
260 el.signal(sig)
260 el.signal(sig)
261 dfinal = gatherBoth(dlist, consumeErrors=True)
261 dfinal = gatherBoth(dlist, consumeErrors=True)
262 dfinal.addCallback(self._handle_stop)
262 dfinal.addCallback(self._handle_stop)
263 return dfinal
263 return dfinal
264
264
265 def interrupt_then_kill(self, delay=1.0):
265 def interrupt_then_kill(self, delay=1.0):
266 dlist = []
266 dlist = []
267 for el in self.launchers:
267 for el in self.launchers:
268 d = el.get_stop_deferred()
268 d = el.get_stop_deferred()
269 dlist.append(d)
269 dlist.append(d)
270 el.interrupt_then_kill(delay)
270 el.interrupt_then_kill(delay)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
272 dfinal.addCallback(self._handle_stop)
272 dfinal.addCallback(self._handle_stop)
273 return dfinal
273 return dfinal
274
274
275 class BatchEngineSet(object):
275 class BatchEngineSet(object):
276
276
277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
278 name = ''
278 name = ''
279 submit_command = ''
279 submit_command = ''
280 delete_command = ''
280 delete_command = ''
281 job_id_regexp = ''
281 job_id_regexp = ''
282 job_array_regexp = ''
282 job_array_regexp = ''
283 job_array_template = ''
283 job_array_template = ''
284 queue_regexp = ''
284 queue_regexp = ''
285 queue_template = ''
285 queue_template = ''
286 default_template = ''
286 default_template = ''
287
287
288 def __init__(self, template_file, queue, **kwargs):
288 def __init__(self, template_file, queue, **kwargs):
289 self.template_file = template_file
289 self.template_file = template_file
290 self.queue = queue
290 self.queue = queue
291
291
292 def parse_job_id(self, output):
292 def parse_job_id(self, output):
293 m = re.search(self.job_id_regexp, output)
293 m = re.search(self.job_id_regexp, output)
294 if m is not None:
294 if m is not None:
295 job_id = m.group()
295 job_id = m.group()
296 else:
296 else:
297 raise Exception("job id couldn't be determined: %s" % output)
297 raise Exception("job id couldn't be determined: %s" % output)
298 self.job_id = job_id
298 self.job_id = job_id
299 log.msg('Job started with job id: %r' % job_id)
299 log.msg('Job started with job id: %r' % job_id)
300 return job_id
300 return job_id
301
301
302 def handle_error(self, f):
302 def handle_error(self, f):
303 f.printTraceback()
303 f.printTraceback()
304 f.raiseException()
304 f.raiseException()
305
305
306 def start(self, n):
306 def start(self, n):
307 log.msg("starting %d engines" % n)
307 log.msg("starting %d engines" % n)
308 self._temp_file = tempfile.NamedTemporaryFile()
308 self._temp_file = tempfile.NamedTemporaryFile()
309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
310 if self.template_file:
310 if self.template_file:
311 log.msg("Using %s script %s" % (self.name, self.template_file))
311 log.msg("Using %s script %s" % (self.name, self.template_file))
312 contents = open(self.template_file, 'r').read()
312 contents = open(self.template_file, 'r').read()
313 new_script = contents
313 new_script = contents
314 regex = re.compile(self.job_array_regexp)
314 regex = re.compile(self.job_array_regexp)
315 if not regex.search(contents):
315 if not regex.search(contents):
316 log.msg("adding job array settings to %s script" % self.name)
316 log.msg("adding job array settings to %s script" % self.name)
317 new_script = self.job_array_template % n +'\n' + new_script
317 new_script = self.job_array_template % n +'\n' + new_script
318 print self.queue_regexp
318 print self.queue_regexp
319 regex = re.compile(self.queue_regexp)
319 regex = re.compile(self.queue_regexp)
320 print regex.search(contents)
320 print regex.search(contents)
321 if self.queue and not regex.search(contents):
321 if self.queue and not regex.search(contents):
322 log.msg("adding queue settings to %s script" % self.name)
322 log.msg("adding queue settings to %s script" % self.name)
323 new_script = self.queue_template % self.queue + '\n' + new_script
323 new_script = self.queue_template % self.queue + '\n' + new_script
324 if new_script != contents:
324 if new_script != contents:
325 self._temp_file.write(new_script)
325 self._temp_file.write(new_script)
326 self.template_file = self._temp_file.name
326 self.template_file = self._temp_file.name
327 else:
327 else:
328 default_script = self.default_template % n
328 default_script = self.default_template % n
329 if self.queue:
329 if self.queue:
330 default_script = self.queue_template % self.queue + \
330 default_script = self.queue_template % self.queue + \
331 '\n' + default_script
331 '\n' + default_script
332 log.msg("using default ipengine %s script: \n%s" %
332 log.msg("using default ipengine %s script: \n%s" %
333 (self.name, default_script))
333 (self.name, default_script))
334 self._temp_file.file.write(default_script)
334 self._temp_file.file.write(default_script)
335 self.template_file = self._temp_file.name
335 self.template_file = self._temp_file.name
336 self._temp_file.file.close()
336 self._temp_file.file.close()
337 d = getProcessOutput(self.submit_command,
337 d = getProcessOutput(self.submit_command,
338 [self.template_file],
338 [self.template_file],
339 env=os.environ)
339 env=os.environ)
340 d.addCallback(self.parse_job_id)
340 d.addCallback(self.parse_job_id)
341 d.addErrback(self.handle_error)
341 d.addErrback(self.handle_error)
342 return d
342 return d
343
343
344 def kill(self):
344 def kill(self):
345 d = getProcessOutput(self.delete_command,
345 d = getProcessOutput(self.delete_command,
346 [self.job_id],env=os.environ)
346 [self.job_id],env=os.environ)
347 return d
347 return d
348
348
349 class PBSEngineSet(BatchEngineSet):
349 class PBSEngineSet(BatchEngineSet):
350
350
351 name = 'PBS'
351 name = 'PBS'
352 submit_command = 'qsub'
352 submit_command = 'qsub'
353 delete_command = 'qdel'
353 delete_command = 'qdel'
354 job_id_regexp = '\d+'
354 job_id_regexp = '\d+'
355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
356 job_array_template = '#PBS -t 1-%d'
356 job_array_template = '#PBS -t 1-%d'
357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
358 queue_template = '#PBS -q %s'
358 queue_template = '#PBS -q %s'
359 default_template="""#!/bin/sh
359 default_template="""#!/bin/sh
360 #PBS -V
360 #PBS -V
361 #PBS -t 1-%d
361 #PBS -t 1-%d
362 #PBS -N ipengine
362 #PBS -N ipengine
363 eid=$(($PBS_ARRAYID - 1))
363 eid=$(($PBS_ARRAYID - 1))
364 ipengine --logfile=ipengine${eid}.log
364 ipengine --logfile=ipengine${eid}.log
365 """
365 """
366
366
367 class SGEEngineSet(PBSEngineSet):
367 class SGEEngineSet(PBSEngineSet):
368
368
369 name = 'SGE'
369 name = 'SGE'
370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
371 job_array_template = '#$ -t 1-%d'
371 job_array_template = '#$ -t 1-%d'
372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
373 queue_template = '#$ -q %s'
373 queue_template = '#$ -q %s'
374 default_template="""#$ -V
374 default_template="""#$ -V
375 #$ -S /bin/sh
375 #$ -S /bin/sh
376 #$ -t 1-%d
376 #$ -t 1-%d
377 #$ -N ipengine
377 #$ -N ipengine
378 eid=$(($SGE_TASK_ID - 1))
378 eid=$(($SGE_TASK_ID - 1))
379 ipengine --logfile=ipengine${eid}.log
379 ipengine --logfile=ipengine${eid}.log
380 """
380 """
381
381
382 class LSFEngineSet(PBSEngineSet):
382 class LSFEngineSet(PBSEngineSet):
383
383
384 name = 'LSF'
384 name = 'LSF'
385 submit_command = 'bsub'
385 submit_command = 'bsub'
386 delete_command = 'bkill'
386 delete_command = 'bkill'
387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
388 job_array_template = '#BSUB -J ipengine[1-%d]'
388 job_array_template = '#BSUB -J ipengine[1-%d]'
389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
390 queue_template = '#BSUB -q %s'
390 queue_template = '#BSUB -q %s'
391 default_template="""#!/bin/sh
391 default_template="""#!/bin/sh
392 #BSUB -J ipengine[1-%d]
392 #BSUB -J ipengine[1-%d]
393 eid=$(($LSB_JOBINDEX - 1))
393 eid=$(($LSB_JOBINDEX - 1))
394 ipengine --logfile=ipengine${eid}.log
394 ipengine --logfile=ipengine${eid}.log
395 """
395 """
396 bsub_wrapper="""#!/bin/sh
397 bsub < $1
398 """
399
400 def __init__(self, template_file, queue, **kwargs):
401 self._bsub_wrapper = self._make_bsub_wrapper()
402 self.submit_command = self._bsub_wrapper.name
403 PBSEngineSet.__init__(self,template_file, queue, **kwargs)
404
405 def _make_bsub_wrapper(self):
406 bsub_wrapper = tempfile.NamedTemporaryFile()
407 bsub_wrapper.write(self.bsub_wrapper)
408 bsub_wrapper.file.close()
409 os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
410 return bsub_wrapper
396
411
397 sshx_template="""#!/bin/sh
412 sshx_template="""#!/bin/sh
398 "$@" &> /dev/null &
413 "$@" &> /dev/null &
399 echo $!
414 echo $!
400 """
415 """
401
416
402 engine_killer_template="""#!/bin/sh
417 engine_killer_template="""#!/bin/sh
403 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
418 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
404 """
419 """
405
420
406 class SSHEngineSet(object):
421 class SSHEngineSet(object):
407 sshx_template=sshx_template
422 sshx_template=sshx_template
408 engine_killer_template=engine_killer_template
423 engine_killer_template=engine_killer_template
409
424
410 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
425 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
411 """Start a controller on localhost and engines using ssh.
426 """Start a controller on localhost and engines using ssh.
412
427
413 The engine_hosts argument is a dict with hostnames as keys and
428 The engine_hosts argument is a dict with hostnames as keys and
414 the number of engine (int) as values. sshx is the name of a local
429 the number of engine (int) as values. sshx is the name of a local
415 file that will be used to run remote commands. This file is used
430 file that will be used to run remote commands. This file is used
416 to setup the environment properly.
431 to setup the environment properly.
417 """
432 """
418
433
419 self.temp_dir = tempfile.gettempdir()
434 self.temp_dir = tempfile.gettempdir()
420 if sshx is not None:
435 if sshx is not None:
421 self.sshx = sshx
436 self.sshx = sshx
422 else:
437 else:
423 # Write the sshx.sh file locally from our template.
438 # Write the sshx.sh file locally from our template.
424 self.sshx = os.path.join(
439 self.sshx = os.path.join(
425 self.temp_dir,
440 self.temp_dir,
426 '%s-main-sshx.sh' % os.environ['USER']
441 '%s-main-sshx.sh' % os.environ['USER']
427 )
442 )
428 f = open(self.sshx, 'w')
443 f = open(self.sshx, 'w')
429 f.writelines(self.sshx_template)
444 f.writelines(self.sshx_template)
430 f.close()
445 f.close()
431 self.engine_command = ipengine
446 self.engine_command = ipengine
432 self.engine_hosts = engine_hosts
447 self.engine_hosts = engine_hosts
433 # Write the engine killer script file locally from our template.
448 # Write the engine killer script file locally from our template.
434 self.engine_killer = os.path.join(
449 self.engine_killer = os.path.join(
435 self.temp_dir,
450 self.temp_dir,
436 '%s-local-engine_killer.sh' % os.environ['USER']
451 '%s-local-engine_killer.sh' % os.environ['USER']
437 )
452 )
438 f = open(self.engine_killer, 'w')
453 f = open(self.engine_killer, 'w')
439 f.writelines(self.engine_killer_template)
454 f.writelines(self.engine_killer_template)
440 f.close()
455 f.close()
441
456
442 def start(self, send_furl=False):
457 def start(self, send_furl=False):
443 dlist = []
458 dlist = []
444 for host in self.engine_hosts.keys():
459 for host in self.engine_hosts.keys():
445 count = self.engine_hosts[host]
460 count = self.engine_hosts[host]
446 d = self._start(host, count, send_furl)
461 d = self._start(host, count, send_furl)
447 dlist.append(d)
462 dlist.append(d)
448 return gatherBoth(dlist, consumeErrors=True)
463 return gatherBoth(dlist, consumeErrors=True)
449
464
450 def _start(self, hostname, count=1, send_furl=False):
465 def _start(self, hostname, count=1, send_furl=False):
451 if send_furl:
466 if send_furl:
452 d = self._scp_furl(hostname)
467 d = self._scp_furl(hostname)
453 else:
468 else:
454 d = defer.succeed(None)
469 d = defer.succeed(None)
455 d.addCallback(lambda r: self._scp_sshx(hostname))
470 d.addCallback(lambda r: self._scp_sshx(hostname))
456 d.addCallback(lambda r: self._ssh_engine(hostname, count))
471 d.addCallback(lambda r: self._ssh_engine(hostname, count))
457 return d
472 return d
458
473
459 def _scp_furl(self, hostname):
474 def _scp_furl(self, hostname):
460 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
475 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
461 cmd_list = scp_cmd.split()
476 cmd_list = scp_cmd.split()
462 cmd_list[1] = os.path.expanduser(cmd_list[1])
477 cmd_list[1] = os.path.expanduser(cmd_list[1])
463 log.msg('Copying furl file: %s' % scp_cmd)
478 log.msg('Copying furl file: %s' % scp_cmd)
464 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
479 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
465 return d
480 return d
466
481
467 def _scp_sshx(self, hostname):
482 def _scp_sshx(self, hostname):
468 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
483 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
469 self.sshx, hostname,
484 self.sshx, hostname,
470 self.temp_dir, os.environ['USER']
485 self.temp_dir, os.environ['USER']
471 )
486 )
472 print
487 print
473 log.msg("Copying sshx: %s" % scp_cmd)
488 log.msg("Copying sshx: %s" % scp_cmd)
474 sshx_scp = scp_cmd.split()
489 sshx_scp = scp_cmd.split()
475 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
490 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
476 return d
491 return d
477
492
478 def _ssh_engine(self, hostname, count):
493 def _ssh_engine(self, hostname, count):
479 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
494 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
480 hostname, self.temp_dir,
495 hostname, self.temp_dir,
481 os.environ['USER'], self.engine_command
496 os.environ['USER'], self.engine_command
482 )
497 )
483 cmds = exec_engine.split()
498 cmds = exec_engine.split()
484 dlist = []
499 dlist = []
485 log.msg("about to start engines...")
500 log.msg("about to start engines...")
486 for i in range(count):
501 for i in range(count):
487 log.msg('Starting engines: %s' % exec_engine)
502 log.msg('Starting engines: %s' % exec_engine)
488 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
503 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
489 dlist.append(d)
504 dlist.append(d)
490 return gatherBoth(dlist, consumeErrors=True)
505 return gatherBoth(dlist, consumeErrors=True)
491
506
492 def kill(self):
507 def kill(self):
493 dlist = []
508 dlist = []
494 for host in self.engine_hosts.keys():
509 for host in self.engine_hosts.keys():
495 d = self._killall(host)
510 d = self._killall(host)
496 dlist.append(d)
511 dlist.append(d)
497 return gatherBoth(dlist, consumeErrors=True)
512 return gatherBoth(dlist, consumeErrors=True)
498
513
499 def _killall(self, hostname):
514 def _killall(self, hostname):
500 d = self._scp_engine_killer(hostname)
515 d = self._scp_engine_killer(hostname)
501 d.addCallback(lambda r: self._ssh_kill(hostname))
516 d.addCallback(lambda r: self._ssh_kill(hostname))
502 # d.addErrback(self._exec_err)
517 # d.addErrback(self._exec_err)
503 return d
518 return d
504
519
505 def _scp_engine_killer(self, hostname):
520 def _scp_engine_killer(self, hostname):
506 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
521 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
507 self.engine_killer,
522 self.engine_killer,
508 hostname,
523 hostname,
509 self.temp_dir,
524 self.temp_dir,
510 os.environ['USER']
525 os.environ['USER']
511 )
526 )
512 cmds = scp_cmd.split()
527 cmds = scp_cmd.split()
513 log.msg('Copying engine_killer: %s' % scp_cmd)
528 log.msg('Copying engine_killer: %s' % scp_cmd)
514 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
529 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
515 return d
530 return d
516
531
517 def _ssh_kill(self, hostname):
532 def _ssh_kill(self, hostname):
518 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
533 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
519 hostname,
534 hostname,
520 self.temp_dir,
535 self.temp_dir,
521 os.environ['USER']
536 os.environ['USER']
522 )
537 )
523 log.msg('Killing engine: %s' % kill_cmd)
538 log.msg('Killing engine: %s' % kill_cmd)
524 kill_cmd = kill_cmd.split()
539 kill_cmd = kill_cmd.split()
525 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
540 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
526 return d
541 return d
527
542
528 def _exec_err(self, r):
543 def _exec_err(self, r):
529 log.msg(r)
544 log.msg(r)
530
545
531 #-----------------------------------------------------------------------------
546 #-----------------------------------------------------------------------------
532 # Main functions for the different types of clusters
547 # Main functions for the different types of clusters
533 #-----------------------------------------------------------------------------
548 #-----------------------------------------------------------------------------
534
549
535 # TODO:
550 # TODO:
536 # The logic in these codes should be moved into classes like LocalCluster
551 # The logic in these codes should be moved into classes like LocalCluster
537 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
552 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
538 # The main functions should then just parse the command line arguments, create
553 # The main functions should then just parse the command line arguments, create
539 # the appropriate class and call a 'start' method.
554 # the appropriate class and call a 'start' method.
540
555
541
556
542 def check_security(args, cont_args):
557 def check_security(args, cont_args):
543 """Check to see if we should run with SSL support."""
558 """Check to see if we should run with SSL support."""
544 if (not args.x or not args.y) and not have_crypto:
559 if (not args.x or not args.y) and not have_crypto:
545 log.err("""
560 log.err("""
546 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
561 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
547 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
562 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
548 reactor.stop()
563 reactor.stop()
549 return False
564 return False
550 if args.x:
565 if args.x:
551 cont_args.append('-x')
566 cont_args.append('-x')
552 if args.y:
567 if args.y:
553 cont_args.append('-y')
568 cont_args.append('-y')
554 return True
569 return True
555
570
556
571
557 def check_reuse(args, cont_args):
572 def check_reuse(args, cont_args):
558 """Check to see if we should try to resuse FURL files."""
573 """Check to see if we should try to resuse FURL files."""
559 if args.r:
574 if args.r:
560 cont_args.append('-r')
575 cont_args.append('-r')
561 if args.client_port == 0 or args.engine_port == 0:
576 if args.client_port == 0 or args.engine_port == 0:
562 log.err("""
577 log.err("""
563 To reuse FURL files, you must also set the client and engine ports using
578 To reuse FURL files, you must also set the client and engine ports using
564 the --client-port and --engine-port options.""")
579 the --client-port and --engine-port options.""")
565 reactor.stop()
580 reactor.stop()
566 return False
581 return False
567 cont_args.append('--client-port=%i' % args.client_port)
582 cont_args.append('--client-port=%i' % args.client_port)
568 cont_args.append('--engine-port=%i' % args.engine_port)
583 cont_args.append('--engine-port=%i' % args.engine_port)
569 return True
584 return True
570
585
571
586
572 def _err_and_stop(f):
587 def _err_and_stop(f):
573 """Errback to log a failure and halt the reactor on a fatal error."""
588 """Errback to log a failure and halt the reactor on a fatal error."""
574 log.err(f)
589 log.err(f)
575 reactor.stop()
590 reactor.stop()
576
591
577
592
578 def _delay_start(cont_pid, start_engines, furl_file, reuse):
593 def _delay_start(cont_pid, start_engines, furl_file, reuse):
579 """Wait for controller to create FURL files and the start the engines."""
594 """Wait for controller to create FURL files and the start the engines."""
580 if not reuse:
595 if not reuse:
581 if os.path.isfile(furl_file):
596 if os.path.isfile(furl_file):
582 os.unlink(furl_file)
597 os.unlink(furl_file)
583 log.msg('Waiting for controller to finish starting...')
598 log.msg('Waiting for controller to finish starting...')
584 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
599 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
585 d.addCallback(lambda _: log.msg('Controller started'))
600 d.addCallback(lambda _: log.msg('Controller started'))
586 d.addCallback(lambda _: start_engines(cont_pid))
601 d.addCallback(lambda _: start_engines(cont_pid))
587 return d
602 return d
588
603
589
604
590 def main_local(args):
605 def main_local(args):
591 cont_args = []
606 cont_args = []
592 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
607 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
593
608
594 # Check security settings before proceeding
609 # Check security settings before proceeding
595 if not check_security(args, cont_args):
610 if not check_security(args, cont_args):
596 return
611 return
597
612
598 # See if we are reusing FURL files
613 # See if we are reusing FURL files
599 if not check_reuse(args, cont_args):
614 if not check_reuse(args, cont_args):
600 return
615 return
601
616
602 cl = ControllerLauncher(extra_args=cont_args)
617 cl = ControllerLauncher(extra_args=cont_args)
603 dstart = cl.start()
618 dstart = cl.start()
604 def start_engines(cont_pid):
619 def start_engines(cont_pid):
605 engine_args = []
620 engine_args = []
606 engine_args.append('--logfile=%s' % \
621 engine_args.append('--logfile=%s' % \
607 pjoin(args.logdir,'ipengine%s-' % cont_pid))
622 pjoin(args.logdir,'ipengine%s-' % cont_pid))
608 eset = LocalEngineSet(extra_args=engine_args)
623 eset = LocalEngineSet(extra_args=engine_args)
609 def shutdown(signum, frame):
624 def shutdown(signum, frame):
610 log.msg('Stopping local cluster')
625 log.msg('Stopping local cluster')
611 # We are still playing with the times here, but these seem
626 # We are still playing with the times here, but these seem
612 # to be reliable in allowing everything to exit cleanly.
627 # to be reliable in allowing everything to exit cleanly.
613 eset.interrupt_then_kill(0.5)
628 eset.interrupt_then_kill(0.5)
614 cl.interrupt_then_kill(0.5)
629 cl.interrupt_then_kill(0.5)
615 reactor.callLater(1.0, reactor.stop)
630 reactor.callLater(1.0, reactor.stop)
616 signal.signal(signal.SIGINT,shutdown)
631 signal.signal(signal.SIGINT,shutdown)
617 d = eset.start(args.n)
632 d = eset.start(args.n)
618 return d
633 return d
619 config = kernel_config_manager.get_config_obj()
634 config = kernel_config_manager.get_config_obj()
620 furl_file = config['controller']['engine_furl_file']
635 furl_file = config['controller']['engine_furl_file']
621 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
636 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
622 dstart.addErrback(_err_and_stop)
637 dstart.addErrback(_err_and_stop)
623
638
624
639
625 def main_mpi(args):
640 def main_mpi(args):
626 cont_args = []
641 cont_args = []
627 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
642 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
628
643
629 # Check security settings before proceeding
644 # Check security settings before proceeding
630 if not check_security(args, cont_args):
645 if not check_security(args, cont_args):
631 return
646 return
632
647
633 # See if we are reusing FURL files
648 # See if we are reusing FURL files
634 if not check_reuse(args, cont_args):
649 if not check_reuse(args, cont_args):
635 return
650 return
636
651
637 cl = ControllerLauncher(extra_args=cont_args)
652 cl = ControllerLauncher(extra_args=cont_args)
638 dstart = cl.start()
653 dstart = cl.start()
639 def start_engines(cont_pid):
654 def start_engines(cont_pid):
640 raw_args = [args.cmd]
655 raw_args = [args.cmd]
641 raw_args.extend(['-n',str(args.n)])
656 raw_args.extend(['-n',str(args.n)])
642 raw_args.append('ipengine')
657 raw_args.append('ipengine')
643 raw_args.append('-l')
658 raw_args.append('-l')
644 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
659 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
645 if args.mpi:
660 if args.mpi:
646 raw_args.append('--mpi=%s' % args.mpi)
661 raw_args.append('--mpi=%s' % args.mpi)
647 eset = ProcessLauncher(raw_args)
662 eset = ProcessLauncher(raw_args)
648 def shutdown(signum, frame):
663 def shutdown(signum, frame):
649 log.msg('Stopping local cluster')
664 log.msg('Stopping local cluster')
650 # We are still playing with the times here, but these seem
665 # We are still playing with the times here, but these seem
651 # to be reliable in allowing everything to exit cleanly.
666 # to be reliable in allowing everything to exit cleanly.
652 eset.interrupt_then_kill(1.0)
667 eset.interrupt_then_kill(1.0)
653 cl.interrupt_then_kill(1.0)
668 cl.interrupt_then_kill(1.0)
654 reactor.callLater(2.0, reactor.stop)
669 reactor.callLater(2.0, reactor.stop)
655 signal.signal(signal.SIGINT,shutdown)
670 signal.signal(signal.SIGINT,shutdown)
656 d = eset.start()
671 d = eset.start()
657 return d
672 return d
658 config = kernel_config_manager.get_config_obj()
673 config = kernel_config_manager.get_config_obj()
659 furl_file = config['controller']['engine_furl_file']
674 furl_file = config['controller']['engine_furl_file']
660 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
675 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
661 dstart.addErrback(_err_and_stop)
676 dstart.addErrback(_err_and_stop)
662
677
663
678
664 def main_pbs(args):
679 def main_pbs(args):
665 cont_args = []
680 cont_args = []
666 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
681 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
667
682
668 # Check security settings before proceeding
683 # Check security settings before proceeding
669 if not check_security(args, cont_args):
684 if not check_security(args, cont_args):
670 return
685 return
671
686
672 # See if we are reusing FURL files
687 # See if we are reusing FURL files
673 if not check_reuse(args, cont_args):
688 if not check_reuse(args, cont_args):
674 return
689 return
675
690
676 if args.pbsscript and not os.path.isfile(args.pbsscript):
691 if args.pbsscript and not os.path.isfile(args.pbsscript):
677 log.err('PBS script does not exist: %s' % args.pbsscript)
692 log.err('PBS script does not exist: %s' % args.pbsscript)
678 return
693 return
679
694
680 cl = ControllerLauncher(extra_args=cont_args)
695 cl = ControllerLauncher(extra_args=cont_args)
681 dstart = cl.start()
696 dstart = cl.start()
682 def start_engines(r):
697 def start_engines(r):
683 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
698 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
684 def shutdown(signum, frame):
699 def shutdown(signum, frame):
685 log.msg('Stopping PBS cluster')
700 log.msg('Stopping PBS cluster')
686 d = pbs_set.kill()
701 d = pbs_set.kill()
687 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
702 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
688 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
703 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
689 signal.signal(signal.SIGINT,shutdown)
704 signal.signal(signal.SIGINT,shutdown)
690 d = pbs_set.start(args.n)
705 d = pbs_set.start(args.n)
691 return d
706 return d
692 config = kernel_config_manager.get_config_obj()
707 config = kernel_config_manager.get_config_obj()
693 furl_file = config['controller']['engine_furl_file']
708 furl_file = config['controller']['engine_furl_file']
694 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
709 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
695 dstart.addErrback(_err_and_stop)
710 dstart.addErrback(_err_and_stop)
696
711
697 def main_sge(args):
712 def main_sge(args):
698 cont_args = []
713 cont_args = []
699 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
714 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
700
715
701 # Check security settings before proceeding
716 # Check security settings before proceeding
702 if not check_security(args, cont_args):
717 if not check_security(args, cont_args):
703 return
718 return
704
719
705 # See if we are reusing FURL files
720 # See if we are reusing FURL files
706 if not check_reuse(args, cont_args):
721 if not check_reuse(args, cont_args):
707 return
722 return
708
723
709 if args.sgescript and not os.path.isfile(args.sgescript):
724 if args.sgescript and not os.path.isfile(args.sgescript):
710 log.err('SGE script does not exist: %s' % args.sgescript)
725 log.err('SGE script does not exist: %s' % args.sgescript)
711 return
726 return
712
727
713 cl = ControllerLauncher(extra_args=cont_args)
728 cl = ControllerLauncher(extra_args=cont_args)
714 dstart = cl.start()
729 dstart = cl.start()
715 def start_engines(r):
730 def start_engines(r):
716 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
731 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
717 def shutdown(signum, frame):
732 def shutdown(signum, frame):
718 log.msg('Stopping sge cluster')
733 log.msg('Stopping sge cluster')
719 d = sge_set.kill()
734 d = sge_set.kill()
720 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
735 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
721 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
736 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
722 signal.signal(signal.SIGINT,shutdown)
737 signal.signal(signal.SIGINT,shutdown)
723 d = sge_set.start(args.n)
738 d = sge_set.start(args.n)
724 return d
739 return d
725 config = kernel_config_manager.get_config_obj()
740 config = kernel_config_manager.get_config_obj()
726 furl_file = config['controller']['engine_furl_file']
741 furl_file = config['controller']['engine_furl_file']
727 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
742 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
728 dstart.addErrback(_err_and_stop)
743 dstart.addErrback(_err_and_stop)
729
744
730 def main_lsf(args):
745 def main_lsf(args):
731 cont_args = []
746 cont_args = []
732 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
747 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
733
748
734 # Check security settings before proceeding
749 # Check security settings before proceeding
735 if not check_security(args, cont_args):
750 if not check_security(args, cont_args):
736 return
751 return
737
752
738 # See if we are reusing FURL files
753 # See if we are reusing FURL files
739 if not check_reuse(args, cont_args):
754 if not check_reuse(args, cont_args):
740 return
755 return
741
756
742 if args.lsfscript and not os.path.isfile(args.lsfscript):
757 if args.lsfscript and not os.path.isfile(args.lsfscript):
743 log.err('LSF script does not exist: %s' % args.lsfscript)
758 log.err('LSF script does not exist: %s' % args.lsfscript)
744 return
759 return
745
760
746 cl = ControllerLauncher(extra_args=cont_args)
761 cl = ControllerLauncher(extra_args=cont_args)
747 dstart = cl.start()
762 dstart = cl.start()
748 def start_engines(r):
763 def start_engines(r):
749 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
764 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
750 def shutdown(signum, frame):
765 def shutdown(signum, frame):
751 log.msg('Stopping LSF cluster')
766 log.msg('Stopping LSF cluster')
752 d = lsf_set.kill()
767 d = lsf_set.kill()
753 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
768 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
754 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
769 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
755 signal.signal(signal.SIGINT,shutdown)
770 signal.signal(signal.SIGINT,shutdown)
756 d = lsf_set.start(args.n)
771 d = lsf_set.start(args.n)
757 return d
772 return d
758 config = kernel_config_manager.get_config_obj()
773 config = kernel_config_manager.get_config_obj()
759 furl_file = config['controller']['engine_furl_file']
774 furl_file = config['controller']['engine_furl_file']
760 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
775 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
761 dstart.addErrback(_err_and_stop)
776 dstart.addErrback(_err_and_stop)
762
777
763
778
764 def main_ssh(args):
779 def main_ssh(args):
765 """Start a controller on localhost and engines using ssh.
780 """Start a controller on localhost and engines using ssh.
766
781
767 Your clusterfile should look like::
782 Your clusterfile should look like::
768
783
769 send_furl = False # True, if you want
784 send_furl = False # True, if you want
770 engines = {
785 engines = {
771 'engine_host1' : engine_count,
786 'engine_host1' : engine_count,
772 'engine_host2' : engine_count2
787 'engine_host2' : engine_count2
773 }
788 }
774 """
789 """
775 clusterfile = {}
790 clusterfile = {}
776 execfile(args.clusterfile, clusterfile)
791 execfile(args.clusterfile, clusterfile)
777 if not clusterfile.has_key('send_furl'):
792 if not clusterfile.has_key('send_furl'):
778 clusterfile['send_furl'] = False
793 clusterfile['send_furl'] = False
779
794
780 cont_args = []
795 cont_args = []
781 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
796 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
782
797
783 # Check security settings before proceeding
798 # Check security settings before proceeding
784 if not check_security(args, cont_args):
799 if not check_security(args, cont_args):
785 return
800 return
786
801
787 # See if we are reusing FURL files
802 # See if we are reusing FURL files
788 if not check_reuse(args, cont_args):
803 if not check_reuse(args, cont_args):
789 return
804 return
790
805
791 cl = ControllerLauncher(extra_args=cont_args)
806 cl = ControllerLauncher(extra_args=cont_args)
792 dstart = cl.start()
807 dstart = cl.start()
793 def start_engines(cont_pid):
808 def start_engines(cont_pid):
794 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
809 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
795 def shutdown(signum, frame):
810 def shutdown(signum, frame):
796 d = ssh_set.kill()
811 d = ssh_set.kill()
797 cl.interrupt_then_kill(1.0)
812 cl.interrupt_then_kill(1.0)
798 reactor.callLater(2.0, reactor.stop)
813 reactor.callLater(2.0, reactor.stop)
799 signal.signal(signal.SIGINT,shutdown)
814 signal.signal(signal.SIGINT,shutdown)
800 d = ssh_set.start(clusterfile['send_furl'])
815 d = ssh_set.start(clusterfile['send_furl'])
801 return d
816 return d
802 config = kernel_config_manager.get_config_obj()
817 config = kernel_config_manager.get_config_obj()
803 furl_file = config['controller']['engine_furl_file']
818 furl_file = config['controller']['engine_furl_file']
804 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
819 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
805 dstart.addErrback(_err_and_stop)
820 dstart.addErrback(_err_and_stop)
806
821
807
822
808 def get_args():
823 def get_args():
809 base_parser = argparse.ArgumentParser(add_help=False)
824 base_parser = argparse.ArgumentParser(add_help=False)
810 base_parser.add_argument(
825 base_parser.add_argument(
811 '-r',
826 '-r',
812 action='store_true',
827 action='store_true',
813 dest='r',
828 dest='r',
814 help='try to reuse FURL files. Use with --client-port and --engine-port'
829 help='try to reuse FURL files. Use with --client-port and --engine-port'
815 )
830 )
816 base_parser.add_argument(
831 base_parser.add_argument(
817 '--client-port',
832 '--client-port',
818 type=int,
833 type=int,
819 dest='client_port',
834 dest='client_port',
820 help='the port the controller will listen on for client connections',
835 help='the port the controller will listen on for client connections',
821 default=0
836 default=0
822 )
837 )
823 base_parser.add_argument(
838 base_parser.add_argument(
824 '--engine-port',
839 '--engine-port',
825 type=int,
840 type=int,
826 dest='engine_port',
841 dest='engine_port',
827 help='the port the controller will listen on for engine connections',
842 help='the port the controller will listen on for engine connections',
828 default=0
843 default=0
829 )
844 )
830 base_parser.add_argument(
845 base_parser.add_argument(
831 '-x',
846 '-x',
832 action='store_true',
847 action='store_true',
833 dest='x',
848 dest='x',
834 help='turn off client security'
849 help='turn off client security'
835 )
850 )
836 base_parser.add_argument(
851 base_parser.add_argument(
837 '-y',
852 '-y',
838 action='store_true',
853 action='store_true',
839 dest='y',
854 dest='y',
840 help='turn off engine security'
855 help='turn off engine security'
841 )
856 )
842 base_parser.add_argument(
857 base_parser.add_argument(
843 "--logdir",
858 "--logdir",
844 type=str,
859 type=str,
845 dest="logdir",
860 dest="logdir",
846 help="directory to put log files (default=$IPYTHONDIR/log)",
861 help="directory to put log files (default=$IPYTHONDIR/log)",
847 default=pjoin(get_ipython_dir(),'log')
862 default=pjoin(get_ipython_dir(),'log')
848 )
863 )
849 base_parser.add_argument(
864 base_parser.add_argument(
850 "-n",
865 "-n",
851 "--num",
866 "--num",
852 type=int,
867 type=int,
853 dest="n",
868 dest="n",
854 default=2,
869 default=2,
855 help="the number of engines to start"
870 help="the number of engines to start"
856 )
871 )
857
872
858 parser = argparse.ArgumentParser(
873 parser = argparse.ArgumentParser(
859 description='IPython cluster startup. This starts a controller and\
874 description='IPython cluster startup. This starts a controller and\
860 engines using various approaches. Use the IPYTHONDIR environment\
875 engines using various approaches. Use the IPYTHONDIR environment\
861 variable to change your IPython directory from the default of\
876 variable to change your IPython directory from the default of\
862 .ipython or _ipython. The log and security subdirectories of your\
877 .ipython or _ipython. The log and security subdirectories of your\
863 IPython directory will be used by this script for log files and\
878 IPython directory will be used by this script for log files and\
864 security files.'
879 security files.'
865 )
880 )
866 subparsers = parser.add_subparsers(
881 subparsers = parser.add_subparsers(
867 help='available cluster types. For help, do "ipcluster TYPE --help"')
882 help='available cluster types. For help, do "ipcluster TYPE --help"')
868
883
869 parser_local = subparsers.add_parser(
884 parser_local = subparsers.add_parser(
870 'local',
885 'local',
871 help='run a local cluster',
886 help='run a local cluster',
872 parents=[base_parser]
887 parents=[base_parser]
873 )
888 )
874 parser_local.set_defaults(func=main_local)
889 parser_local.set_defaults(func=main_local)
875
890
876 parser_mpirun = subparsers.add_parser(
891 parser_mpirun = subparsers.add_parser(
877 'mpirun',
892 'mpirun',
878 help='run a cluster using mpirun (mpiexec also works)',
893 help='run a cluster using mpirun (mpiexec also works)',
879 parents=[base_parser]
894 parents=[base_parser]
880 )
895 )
881 parser_mpirun.add_argument(
896 parser_mpirun.add_argument(
882 "--mpi",
897 "--mpi",
883 type=str,
898 type=str,
884 dest="mpi", # Don't put a default here to allow no MPI support
899 dest="mpi", # Don't put a default here to allow no MPI support
885 help="how to call MPI_Init (default=mpi4py)"
900 help="how to call MPI_Init (default=mpi4py)"
886 )
901 )
887 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
902 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
888
903
889 parser_mpiexec = subparsers.add_parser(
904 parser_mpiexec = subparsers.add_parser(
890 'mpiexec',
905 'mpiexec',
891 help='run a cluster using mpiexec (mpirun also works)',
906 help='run a cluster using mpiexec (mpirun also works)',
892 parents=[base_parser]
907 parents=[base_parser]
893 )
908 )
894 parser_mpiexec.add_argument(
909 parser_mpiexec.add_argument(
895 "--mpi",
910 "--mpi",
896 type=str,
911 type=str,
897 dest="mpi", # Don't put a default here to allow no MPI support
912 dest="mpi", # Don't put a default here to allow no MPI support
898 help="how to call MPI_Init (default=mpi4py)"
913 help="how to call MPI_Init (default=mpi4py)"
899 )
914 )
900 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
915 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
901
916
902 parser_pbs = subparsers.add_parser(
917 parser_pbs = subparsers.add_parser(
903 'pbs',
918 'pbs',
904 help='run a pbs cluster',
919 help='run a pbs cluster',
905 parents=[base_parser]
920 parents=[base_parser]
906 )
921 )
907 parser_pbs.add_argument(
922 parser_pbs.add_argument(
908 '-s',
923 '-s',
909 '--pbs-script',
924 '--pbs-script',
910 type=str,
925 type=str,
911 dest='pbsscript',
926 dest='pbsscript',
912 help='PBS script template',
927 help='PBS script template',
913 default=''
928 default=''
914 )
929 )
915 parser_pbs.add_argument(
930 parser_pbs.add_argument(
916 '-q',
931 '-q',
917 '--queue',
932 '--queue',
918 type=str,
933 type=str,
919 dest='pbsqueue',
934 dest='pbsqueue',
920 help='PBS queue to use when starting the engines',
935 help='PBS queue to use when starting the engines',
921 default=None,
936 default=None,
922 )
937 )
923 parser_pbs.set_defaults(func=main_pbs)
938 parser_pbs.set_defaults(func=main_pbs)
924
939
925 parser_sge = subparsers.add_parser(
940 parser_sge = subparsers.add_parser(
926 'sge',
941 'sge',
927 help='run an sge cluster',
942 help='run an sge cluster',
928 parents=[base_parser]
943 parents=[base_parser]
929 )
944 )
930 parser_sge.add_argument(
945 parser_sge.add_argument(
931 '-s',
946 '-s',
932 '--sge-script',
947 '--sge-script',
933 type=str,
948 type=str,
934 dest='sgescript',
949 dest='sgescript',
935 help='SGE script template',
950 help='SGE script template',
936 default='' # SGEEngineSet will create one if not specified
951 default='' # SGEEngineSet will create one if not specified
937 )
952 )
938 parser_sge.add_argument(
953 parser_sge.add_argument(
939 '-q',
954 '-q',
940 '--queue',
955 '--queue',
941 type=str,
956 type=str,
942 dest='sgequeue',
957 dest='sgequeue',
943 help='SGE queue to use when starting the engines',
958 help='SGE queue to use when starting the engines',
944 default=None,
959 default=None,
945 )
960 )
946 parser_sge.set_defaults(func=main_sge)
961 parser_sge.set_defaults(func=main_sge)
947
962
948 parser_lsf = subparsers.add_parser(
963 parser_lsf = subparsers.add_parser(
949 'lsf',
964 'lsf',
950 help='run an lsf cluster',
965 help='run an lsf cluster',
951 parents=[base_parser]
966 parents=[base_parser]
952 )
967 )
953
968
954 parser_lsf.add_argument(
969 parser_lsf.add_argument(
955 '-s',
970 '-s',
956 '--lsf-script',
971 '--lsf-script',
957 type=str,
972 type=str,
958 dest='lsfscript',
973 dest='lsfscript',
959 help='LSF script template',
974 help='LSF script template',
960 default='' # LSFEngineSet will create one if not specified
975 default='' # LSFEngineSet will create one if not specified
961 )
976 )
962
977
963 parser_lsf.add_argument(
978 parser_lsf.add_argument(
964 '-q',
979 '-q',
965 '--queue',
980 '--queue',
966 type=str,
981 type=str,
967 dest='lsfqueue',
982 dest='lsfqueue',
968 help='LSF queue to use when starting the engines',
983 help='LSF queue to use when starting the engines',
969 default=None,
984 default=None,
970 )
985 )
971 parser_lsf.set_defaults(func=main_lsf)
986 parser_lsf.set_defaults(func=main_lsf)
972
987
973 parser_ssh = subparsers.add_parser(
988 parser_ssh = subparsers.add_parser(
974 'ssh',
989 'ssh',
975 help='run a cluster using ssh, should have ssh-keys setup',
990 help='run a cluster using ssh, should have ssh-keys setup',
976 parents=[base_parser]
991 parents=[base_parser]
977 )
992 )
978 parser_ssh.add_argument(
993 parser_ssh.add_argument(
979 '--clusterfile',
994 '--clusterfile',
980 type=str,
995 type=str,
981 dest='clusterfile',
996 dest='clusterfile',
982 help='python file describing the cluster',
997 help='python file describing the cluster',
983 default='clusterfile.py',
998 default='clusterfile.py',
984 )
999 )
985 parser_ssh.add_argument(
1000 parser_ssh.add_argument(
986 '--sshx',
1001 '--sshx',
987 type=str,
1002 type=str,
988 dest='sshx',
1003 dest='sshx',
989 help='sshx launcher helper'
1004 help='sshx launcher helper'
990 )
1005 )
991 parser_ssh.set_defaults(func=main_ssh)
1006 parser_ssh.set_defaults(func=main_ssh)
992
1007
993 args = parser.parse_args()
1008 args = parser.parse_args()
994 return args
1009 return args
995
1010
996 def main():
1011 def main():
997 args = get_args()
1012 args = get_args()
998 reactor.callWhenRunning(args.func, args)
1013 reactor.callWhenRunning(args.func, args)
999 log.startLogging(sys.stdout)
1014 log.startLogging(sys.stdout)
1000 reactor.run()
1015 reactor.run()
1001
1016
1002 if __name__ == '__main__':
1017 if __name__ == '__main__':
1003 main()
1018 main()
General Comments 0
You need to be logged in to leave comments. Login now