##// END OF EJS Templates
Remove bash-ism and use standard posix redirect in sshx.sh script....
Fernando Perez -
Show More
@@ -1,1041 +1,1041 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import stat
21 import stat
22 import tempfile
22 import tempfile
23 pjoin = os.path.join
23 pjoin = os.path.join
24
24
25 from twisted.internet import reactor, defer
25 from twisted.internet import reactor, defer
26 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.protocol import ProcessProtocol
27 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.error import ProcessDone, ProcessTerminated
28 from twisted.internet.utils import getProcessOutput
28 from twisted.internet.utils import getProcessOutput
29 from twisted.python import failure, log
29 from twisted.python import failure, log
30
30
31 from IPython.external import argparse
31 from IPython.external import argparse
32 from IPython.external import Itpl
32 from IPython.external import Itpl
33 from IPython.genutils import (
33 from IPython.genutils import (
34 get_ipython_dir,
34 get_ipython_dir,
35 get_log_dir,
35 get_log_dir,
36 get_security_dir,
36 get_security_dir,
37 num_cpus
37 num_cpus
38 )
38 )
39 from IPython.kernel.fcutil import have_crypto
39 from IPython.kernel.fcutil import have_crypto
40
40
41 # Create various ipython directories if they don't exist.
41 # Create various ipython directories if they don't exist.
42 # This must be done before IPython.kernel.config is imported.
42 # This must be done before IPython.kernel.config is imported.
43 from IPython.iplib import user_setup
43 from IPython.iplib import user_setup
44 if os.name == 'posix':
44 if os.name == 'posix':
45 rc_suffix = ''
45 rc_suffix = ''
46 else:
46 else:
47 rc_suffix = '.ini'
47 rc_suffix = '.ini'
48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
49 get_log_dir()
49 get_log_dir()
50 get_security_dir()
50 get_security_dir()
51
51
52 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.config import config_manager as kernel_config_manager
53 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.error import SecurityError, FileTimeoutError
54 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.fcutil import have_crypto
55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
56 from IPython.kernel.util import printer
56 from IPython.kernel.util import printer
57
57
58 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
59 # General process handling code
59 # General process handling code
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61
61
62
62
63 class ProcessStateError(Exception):
63 class ProcessStateError(Exception):
64 pass
64 pass
65
65
66 class UnknownStatus(Exception):
66 class UnknownStatus(Exception):
67 pass
67 pass
68
68
69 class LauncherProcessProtocol(ProcessProtocol):
69 class LauncherProcessProtocol(ProcessProtocol):
70 """
70 """
71 A ProcessProtocol to go with the ProcessLauncher.
71 A ProcessProtocol to go with the ProcessLauncher.
72 """
72 """
73 def __init__(self, process_launcher):
73 def __init__(self, process_launcher):
74 self.process_launcher = process_launcher
74 self.process_launcher = process_launcher
75
75
76 def connectionMade(self):
76 def connectionMade(self):
77 self.process_launcher.fire_start_deferred(self.transport.pid)
77 self.process_launcher.fire_start_deferred(self.transport.pid)
78
78
79 def processEnded(self, status):
79 def processEnded(self, status):
80 value = status.value
80 value = status.value
81 if isinstance(value, ProcessDone):
81 if isinstance(value, ProcessDone):
82 self.process_launcher.fire_stop_deferred(0)
82 self.process_launcher.fire_stop_deferred(0)
83 elif isinstance(value, ProcessTerminated):
83 elif isinstance(value, ProcessTerminated):
84 self.process_launcher.fire_stop_deferred(
84 self.process_launcher.fire_stop_deferred(
85 {'exit_code':value.exitCode,
85 {'exit_code':value.exitCode,
86 'signal':value.signal,
86 'signal':value.signal,
87 'status':value.status
87 'status':value.status
88 }
88 }
89 )
89 )
90 else:
90 else:
91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
92
92
93 def outReceived(self, data):
93 def outReceived(self, data):
94 log.msg(data)
94 log.msg(data)
95
95
96 def errReceived(self, data):
96 def errReceived(self, data):
97 log.err(data)
97 log.err(data)
98
98
99 class ProcessLauncher(object):
99 class ProcessLauncher(object):
100 """
100 """
101 Start and stop an external process in an asynchronous manner.
101 Start and stop an external process in an asynchronous manner.
102
102
103 Currently this uses deferreds to notify other parties of process state
103 Currently this uses deferreds to notify other parties of process state
104 changes. This is an awkward design and should be moved to using
104 changes. This is an awkward design and should be moved to using
105 a formal NotificationCenter.
105 a formal NotificationCenter.
106 """
106 """
107 def __init__(self, cmd_and_args):
107 def __init__(self, cmd_and_args):
108 self.cmd = cmd_and_args[0]
108 self.cmd = cmd_and_args[0]
109 self.args = cmd_and_args
109 self.args = cmd_and_args
110 self._reset()
110 self._reset()
111
111
112 def _reset(self):
112 def _reset(self):
113 self.process_protocol = None
113 self.process_protocol = None
114 self.pid = None
114 self.pid = None
115 self.start_deferred = None
115 self.start_deferred = None
116 self.stop_deferreds = []
116 self.stop_deferreds = []
117 self.state = 'before' # before, running, or after
117 self.state = 'before' # before, running, or after
118
118
119 @property
119 @property
120 def running(self):
120 def running(self):
121 if self.state == 'running':
121 if self.state == 'running':
122 return True
122 return True
123 else:
123 else:
124 return False
124 return False
125
125
126 def fire_start_deferred(self, pid):
126 def fire_start_deferred(self, pid):
127 self.pid = pid
127 self.pid = pid
128 self.state = 'running'
128 self.state = 'running'
129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
130 self.start_deferred.callback(pid)
130 self.start_deferred.callback(pid)
131
131
132 def start(self):
132 def start(self):
133 if self.state == 'before':
133 if self.state == 'before':
134 self.process_protocol = LauncherProcessProtocol(self)
134 self.process_protocol = LauncherProcessProtocol(self)
135 self.start_deferred = defer.Deferred()
135 self.start_deferred = defer.Deferred()
136 self.process_transport = reactor.spawnProcess(
136 self.process_transport = reactor.spawnProcess(
137 self.process_protocol,
137 self.process_protocol,
138 self.cmd,
138 self.cmd,
139 self.args,
139 self.args,
140 env=os.environ
140 env=os.environ
141 )
141 )
142 return self.start_deferred
142 return self.start_deferred
143 else:
143 else:
144 s = 'the process has already been started and has state: %r' % \
144 s = 'the process has already been started and has state: %r' % \
145 self.state
145 self.state
146 return defer.fail(ProcessStateError(s))
146 return defer.fail(ProcessStateError(s))
147
147
148 def get_stop_deferred(self):
148 def get_stop_deferred(self):
149 if self.state == 'running' or self.state == 'before':
149 if self.state == 'running' or self.state == 'before':
150 d = defer.Deferred()
150 d = defer.Deferred()
151 self.stop_deferreds.append(d)
151 self.stop_deferreds.append(d)
152 return d
152 return d
153 else:
153 else:
154 s = 'this process is already complete'
154 s = 'this process is already complete'
155 return defer.fail(ProcessStateError(s))
155 return defer.fail(ProcessStateError(s))
156
156
157 def fire_stop_deferred(self, exit_code):
157 def fire_stop_deferred(self, exit_code):
158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
159 self.state = 'after'
159 self.state = 'after'
160 for d in self.stop_deferreds:
160 for d in self.stop_deferreds:
161 d.callback(exit_code)
161 d.callback(exit_code)
162
162
163 def signal(self, sig):
163 def signal(self, sig):
164 """
164 """
165 Send a signal to the process.
165 Send a signal to the process.
166
166
167 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 The argument sig can be ('KILL','INT', etc.) or any signal number.
168 """
168 """
169 if self.state == 'running':
169 if self.state == 'running':
170 self.process_transport.signalProcess(sig)
170 self.process_transport.signalProcess(sig)
171
171
172 # def __del__(self):
172 # def __del__(self):
173 # self.signal('KILL')
173 # self.signal('KILL')
174
174
175 def interrupt_then_kill(self, delay=1.0):
175 def interrupt_then_kill(self, delay=1.0):
176 self.signal('INT')
176 self.signal('INT')
177 reactor.callLater(delay, self.signal, 'KILL')
177 reactor.callLater(delay, self.signal, 'KILL')
178
178
179
179
180 #-----------------------------------------------------------------------------
180 #-----------------------------------------------------------------------------
181 # Code for launching controller and engines
181 # Code for launching controller and engines
182 #-----------------------------------------------------------------------------
182 #-----------------------------------------------------------------------------
183
183
184
184
185 class ControllerLauncher(ProcessLauncher):
185 class ControllerLauncher(ProcessLauncher):
186
186
187 def __init__(self, extra_args=None):
187 def __init__(self, extra_args=None):
188 if sys.platform == 'win32':
188 if sys.platform == 'win32':
189 # This logic is needed because the ipcontroller script doesn't
189 # This logic is needed because the ipcontroller script doesn't
190 # always get installed in the same way or in the same location.
190 # always get installed in the same way or in the same location.
191 from IPython.kernel.scripts import ipcontroller
191 from IPython.kernel.scripts import ipcontroller
192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
193 # The -u option here turns on unbuffered output, which is required
193 # The -u option here turns on unbuffered output, which is required
194 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # on Win32 to prevent wierd conflict and problems with Twisted.
195 # Also, use sys.executable to make sure we are picking up the
195 # Also, use sys.executable to make sure we are picking up the
196 # right python exe.
196 # right python exe.
197 args = [sys.executable, '-u', script_location]
197 args = [sys.executable, '-u', script_location]
198 else:
198 else:
199 args = ['ipcontroller']
199 args = ['ipcontroller']
200 self.extra_args = extra_args
200 self.extra_args = extra_args
201 if extra_args is not None:
201 if extra_args is not None:
202 args.extend(extra_args)
202 args.extend(extra_args)
203
203
204 ProcessLauncher.__init__(self, args)
204 ProcessLauncher.__init__(self, args)
205
205
206
206
207 class EngineLauncher(ProcessLauncher):
207 class EngineLauncher(ProcessLauncher):
208
208
209 def __init__(self, extra_args=None):
209 def __init__(self, extra_args=None):
210 if sys.platform == 'win32':
210 if sys.platform == 'win32':
211 # This logic is needed because the ipcontroller script doesn't
211 # This logic is needed because the ipcontroller script doesn't
212 # always get installed in the same way or in the same location.
212 # always get installed in the same way or in the same location.
213 from IPython.kernel.scripts import ipengine
213 from IPython.kernel.scripts import ipengine
214 script_location = ipengine.__file__.replace('.pyc', '.py')
214 script_location = ipengine.__file__.replace('.pyc', '.py')
215 # The -u option here turns on unbuffered output, which is required
215 # The -u option here turns on unbuffered output, which is required
216 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # on Win32 to prevent wierd conflict and problems with Twisted.
217 # Also, use sys.executable to make sure we are picking up the
217 # Also, use sys.executable to make sure we are picking up the
218 # right python exe.
218 # right python exe.
219 args = [sys.executable, '-u', script_location]
219 args = [sys.executable, '-u', script_location]
220 else:
220 else:
221 args = ['ipengine']
221 args = ['ipengine']
222 self.extra_args = extra_args
222 self.extra_args = extra_args
223 if extra_args is not None:
223 if extra_args is not None:
224 args.extend(extra_args)
224 args.extend(extra_args)
225
225
226 ProcessLauncher.__init__(self, args)
226 ProcessLauncher.__init__(self, args)
227
227
228
228
229 class LocalEngineSet(object):
229 class LocalEngineSet(object):
230
230
231 def __init__(self, extra_args=None):
231 def __init__(self, extra_args=None):
232 self.extra_args = extra_args
232 self.extra_args = extra_args
233 self.launchers = []
233 self.launchers = []
234
234
235 def start(self, n):
235 def start(self, n):
236 dlist = []
236 dlist = []
237 for i in range(n):
237 for i in range(n):
238 print "starting engine:", i
238 print "starting engine:", i
239 el = EngineLauncher(extra_args=self.extra_args)
239 el = EngineLauncher(extra_args=self.extra_args)
240 d = el.start()
240 d = el.start()
241 self.launchers.append(el)
241 self.launchers.append(el)
242 dlist.append(d)
242 dlist.append(d)
243 dfinal = gatherBoth(dlist, consumeErrors=True)
243 dfinal = gatherBoth(dlist, consumeErrors=True)
244 dfinal.addCallback(self._handle_start)
244 dfinal.addCallback(self._handle_start)
245 return dfinal
245 return dfinal
246
246
247 def _handle_start(self, r):
247 def _handle_start(self, r):
248 log.msg('Engines started with pids: %r' % r)
248 log.msg('Engines started with pids: %r' % r)
249 return r
249 return r
250
250
251 def _handle_stop(self, r):
251 def _handle_stop(self, r):
252 log.msg('Engines received signal: %r' % r)
252 log.msg('Engines received signal: %r' % r)
253 return r
253 return r
254
254
255 def signal(self, sig):
255 def signal(self, sig):
256 dlist = []
256 dlist = []
257 for el in self.launchers:
257 for el in self.launchers:
258 d = el.get_stop_deferred()
258 d = el.get_stop_deferred()
259 dlist.append(d)
259 dlist.append(d)
260 el.signal(sig)
260 el.signal(sig)
261 dfinal = gatherBoth(dlist, consumeErrors=True)
261 dfinal = gatherBoth(dlist, consumeErrors=True)
262 dfinal.addCallback(self._handle_stop)
262 dfinal.addCallback(self._handle_stop)
263 return dfinal
263 return dfinal
264
264
265 def interrupt_then_kill(self, delay=1.0):
265 def interrupt_then_kill(self, delay=1.0):
266 dlist = []
266 dlist = []
267 for el in self.launchers:
267 for el in self.launchers:
268 d = el.get_stop_deferred()
268 d = el.get_stop_deferred()
269 dlist.append(d)
269 dlist.append(d)
270 el.interrupt_then_kill(delay)
270 el.interrupt_then_kill(delay)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
272 dfinal.addCallback(self._handle_stop)
272 dfinal.addCallback(self._handle_stop)
273 return dfinal
273 return dfinal
274
274
275 class BatchEngineSet(object):
275 class BatchEngineSet(object):
276
276
277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
278 name = ''
278 name = ''
279 submit_command = ''
279 submit_command = ''
280 delete_command = ''
280 delete_command = ''
281 job_id_regexp = ''
281 job_id_regexp = ''
282 job_array_regexp = ''
282 job_array_regexp = ''
283 job_array_template = ''
283 job_array_template = ''
284 queue_regexp = ''
284 queue_regexp = ''
285 queue_template = ''
285 queue_template = ''
286 default_template = ''
286 default_template = ''
287
287
288 def __init__(self, template_file, queue, **kwargs):
288 def __init__(self, template_file, queue, **kwargs):
289 self.template_file = template_file
289 self.template_file = template_file
290 self.queue = queue
290 self.queue = queue
291
291
292 def parse_job_id(self, output):
292 def parse_job_id(self, output):
293 m = re.search(self.job_id_regexp, output)
293 m = re.search(self.job_id_regexp, output)
294 if m is not None:
294 if m is not None:
295 job_id = m.group()
295 job_id = m.group()
296 else:
296 else:
297 raise Exception("job id couldn't be determined: %s" % output)
297 raise Exception("job id couldn't be determined: %s" % output)
298 self.job_id = job_id
298 self.job_id = job_id
299 log.msg('Job started with job id: %r' % job_id)
299 log.msg('Job started with job id: %r' % job_id)
300 return job_id
300 return job_id
301
301
302 def handle_error(self, f):
302 def handle_error(self, f):
303 f.printTraceback()
303 f.printTraceback()
304 f.raiseException()
304 f.raiseException()
305
305
306 def start(self, n):
306 def start(self, n):
307 log.msg("starting %d engines" % n)
307 log.msg("starting %d engines" % n)
308 self._temp_file = tempfile.NamedTemporaryFile()
308 self._temp_file = tempfile.NamedTemporaryFile()
309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
310 if self.template_file:
310 if self.template_file:
311 log.msg("Using %s script %s" % (self.name, self.template_file))
311 log.msg("Using %s script %s" % (self.name, self.template_file))
312 contents = open(self.template_file, 'r').read()
312 contents = open(self.template_file, 'r').read()
313 new_script = contents
313 new_script = contents
314 regex = re.compile(self.job_array_regexp)
314 regex = re.compile(self.job_array_regexp)
315 if not regex.search(contents):
315 if not regex.search(contents):
316 log.msg("adding job array settings to %s script" % self.name)
316 log.msg("adding job array settings to %s script" % self.name)
317 new_script = self.job_array_template % n +'\n' + new_script
317 new_script = self.job_array_template % n +'\n' + new_script
318 print self.queue_regexp
318 print self.queue_regexp
319 regex = re.compile(self.queue_regexp)
319 regex = re.compile(self.queue_regexp)
320 print regex.search(contents)
320 print regex.search(contents)
321 if self.queue and not regex.search(contents):
321 if self.queue and not regex.search(contents):
322 log.msg("adding queue settings to %s script" % self.name)
322 log.msg("adding queue settings to %s script" % self.name)
323 new_script = self.queue_template % self.queue + '\n' + new_script
323 new_script = self.queue_template % self.queue + '\n' + new_script
324 if new_script != contents:
324 if new_script != contents:
325 self._temp_file.write(new_script)
325 self._temp_file.write(new_script)
326 self.template_file = self._temp_file.name
326 self.template_file = self._temp_file.name
327 else:
327 else:
328 default_script = self.default_template % n
328 default_script = self.default_template % n
329 if self.queue:
329 if self.queue:
330 default_script = self.queue_template % self.queue + \
330 default_script = self.queue_template % self.queue + \
331 '\n' + default_script
331 '\n' + default_script
332 log.msg("using default ipengine %s script: \n%s" %
332 log.msg("using default ipengine %s script: \n%s" %
333 (self.name, default_script))
333 (self.name, default_script))
334 self._temp_file.file.write(default_script)
334 self._temp_file.file.write(default_script)
335 self.template_file = self._temp_file.name
335 self.template_file = self._temp_file.name
336 self._temp_file.file.close()
336 self._temp_file.file.close()
337 d = getProcessOutput(self.submit_command,
337 d = getProcessOutput(self.submit_command,
338 [self.template_file],
338 [self.template_file],
339 env=os.environ)
339 env=os.environ)
340 d.addCallback(self.parse_job_id)
340 d.addCallback(self.parse_job_id)
341 d.addErrback(self.handle_error)
341 d.addErrback(self.handle_error)
342 return d
342 return d
343
343
344 def kill(self):
344 def kill(self):
345 d = getProcessOutput(self.delete_command,
345 d = getProcessOutput(self.delete_command,
346 [self.job_id],env=os.environ)
346 [self.job_id],env=os.environ)
347 return d
347 return d
348
348
349 class PBSEngineSet(BatchEngineSet):
349 class PBSEngineSet(BatchEngineSet):
350
350
351 name = 'PBS'
351 name = 'PBS'
352 submit_command = 'qsub'
352 submit_command = 'qsub'
353 delete_command = 'qdel'
353 delete_command = 'qdel'
354 job_id_regexp = '\d+'
354 job_id_regexp = '\d+'
355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
356 job_array_template = '#PBS -t 1-%d'
356 job_array_template = '#PBS -t 1-%d'
357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
358 queue_template = '#PBS -q %s'
358 queue_template = '#PBS -q %s'
359 default_template="""#!/bin/sh
359 default_template="""#!/bin/sh
360 #PBS -V
360 #PBS -V
361 #PBS -t 1-%d
361 #PBS -t 1-%d
362 #PBS -N ipengine
362 #PBS -N ipengine
363 eid=$(($PBS_ARRAYID - 1))
363 eid=$(($PBS_ARRAYID - 1))
364 ipengine --logfile=ipengine${eid}.log
364 ipengine --logfile=ipengine${eid}.log
365 """
365 """
366
366
367 class SGEEngineSet(PBSEngineSet):
367 class SGEEngineSet(PBSEngineSet):
368
368
369 name = 'SGE'
369 name = 'SGE'
370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
371 job_array_template = '#$ -t 1-%d'
371 job_array_template = '#$ -t 1-%d'
372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
373 queue_template = '#$ -q %s'
373 queue_template = '#$ -q %s'
374 default_template="""#$ -V
374 default_template="""#$ -V
375 #$ -S /bin/sh
375 #$ -S /bin/sh
376 #$ -t 1-%d
376 #$ -t 1-%d
377 #$ -N ipengine
377 #$ -N ipengine
378 eid=$(($SGE_TASK_ID - 1))
378 eid=$(($SGE_TASK_ID - 1))
379 ipengine --logfile=ipengine${eid}.log
379 ipengine --logfile=ipengine${eid}.log
380 """
380 """
381
381
382 class LSFEngineSet(PBSEngineSet):
382 class LSFEngineSet(PBSEngineSet):
383
383
384 name = 'LSF'
384 name = 'LSF'
385 submit_command = 'bsub'
385 submit_command = 'bsub'
386 delete_command = 'bkill'
386 delete_command = 'bkill'
387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
388 job_array_template = '#BSUB -J ipengine[1-%d]'
388 job_array_template = '#BSUB -J ipengine[1-%d]'
389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
390 queue_template = '#BSUB -q %s'
390 queue_template = '#BSUB -q %s'
391 default_template="""#!/bin/sh
391 default_template="""#!/bin/sh
392 #BSUB -J ipengine[1-%d]
392 #BSUB -J ipengine[1-%d]
393 eid=$(($LSB_JOBINDEX - 1))
393 eid=$(($LSB_JOBINDEX - 1))
394 ipengine --logfile=ipengine${eid}.log
394 ipengine --logfile=ipengine${eid}.log
395 """
395 """
396 bsub_wrapper="""#!/bin/sh
396 bsub_wrapper="""#!/bin/sh
397 bsub < $1
397 bsub < $1
398 """
398 """
399
399
400 def __init__(self, template_file, queue, **kwargs):
400 def __init__(self, template_file, queue, **kwargs):
401 self._bsub_wrapper = self._make_bsub_wrapper()
401 self._bsub_wrapper = self._make_bsub_wrapper()
402 self.submit_command = self._bsub_wrapper.name
402 self.submit_command = self._bsub_wrapper.name
403 PBSEngineSet.__init__(self,template_file, queue, **kwargs)
403 PBSEngineSet.__init__(self,template_file, queue, **kwargs)
404
404
405 def _make_bsub_wrapper(self):
405 def _make_bsub_wrapper(self):
406 bsub_wrapper = tempfile.NamedTemporaryFile()
406 bsub_wrapper = tempfile.NamedTemporaryFile()
407 bsub_wrapper.write(self.bsub_wrapper)
407 bsub_wrapper.write(self.bsub_wrapper)
408 bsub_wrapper.file.close()
408 bsub_wrapper.file.close()
409 os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
409 os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
410 return bsub_wrapper
410 return bsub_wrapper
411
411
412 sshx_template_prefix="""#!/bin/sh
412 sshx_template_prefix="""#!/bin/sh
413 """
413 """
414 sshx_template_suffix=""""$@" &> /dev/null &
414 sshx_template_suffix=""""$@" > /dev/null 2>&1 &
415 echo $!
415 echo $!
416 """
416 """
417
417
418 engine_killer_template="""#!/bin/sh
418 engine_killer_template="""#!/bin/sh
419 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
419 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
420 """
420 """
421
421
422 def escape_strings(val):
422 def escape_strings(val):
423 val = val.replace('(','\(')
423 val = val.replace('(','\(')
424 val = val.replace(')','\)')
424 val = val.replace(')','\)')
425 if ' ' in val:
425 if ' ' in val:
426 val = '"%s"'%val
426 val = '"%s"'%val
427 return val
427 return val
428
428
429 class SSHEngineSet(object):
429 class SSHEngineSet(object):
430 sshx_template_prefix=sshx_template_prefix
430 sshx_template_prefix=sshx_template_prefix
431 sshx_template_suffix=sshx_template_suffix
431 sshx_template_suffix=sshx_template_suffix
432 engine_killer_template=engine_killer_template
432 engine_killer_template=engine_killer_template
433
433
434 def __init__(self, engine_hosts, sshx=None, copyenvs=None, ipengine="ipengine"):
434 def __init__(self, engine_hosts, sshx=None, copyenvs=None, ipengine="ipengine"):
435 """Start a controller on localhost and engines using ssh.
435 """Start a controller on localhost and engines using ssh.
436
436
437 The engine_hosts argument is a dict with hostnames as keys and
437 The engine_hosts argument is a dict with hostnames as keys and
438 the number of engine (int) as values. sshx is the name of a local
438 the number of engine (int) as values. sshx is the name of a local
439 file that will be used to run remote commands. This file is used
439 file that will be used to run remote commands. This file is used
440 to setup the environment properly.
440 to setup the environment properly.
441 """
441 """
442
442
443 self.temp_dir = tempfile.gettempdir()
443 self.temp_dir = tempfile.gettempdir()
444 if sshx is not None:
444 if sshx is not None:
445 self.sshx = sshx
445 self.sshx = sshx
446 else:
446 else:
447 # Write the sshx.sh file locally from our template.
447 # Write the sshx.sh file locally from our template.
448 self.sshx = os.path.join(
448 self.sshx = os.path.join(
449 self.temp_dir,
449 self.temp_dir,
450 '%s-main-sshx.sh' % os.environ['USER']
450 '%s-main-sshx.sh' % os.environ['USER']
451 )
451 )
452 f = open(self.sshx, 'w')
452 f = open(self.sshx, 'w')
453 f.writelines(self.sshx_template_prefix)
453 f.writelines(self.sshx_template_prefix)
454 if copyenvs:
454 if copyenvs:
455 for key, val in sorted(os.environ.items()):
455 for key, val in sorted(os.environ.items()):
456 newval = escape_strings(val)
456 newval = escape_strings(val)
457 f.writelines('export %s=%s\n'%(key,newval))
457 f.writelines('export %s=%s\n'%(key,newval))
458 f.writelines(self.sshx_template_suffix)
458 f.writelines(self.sshx_template_suffix)
459 f.close()
459 f.close()
460 self.engine_command = ipengine
460 self.engine_command = ipengine
461 self.engine_hosts = engine_hosts
461 self.engine_hosts = engine_hosts
462 # Write the engine killer script file locally from our template.
462 # Write the engine killer script file locally from our template.
463 self.engine_killer = os.path.join(
463 self.engine_killer = os.path.join(
464 self.temp_dir,
464 self.temp_dir,
465 '%s-local-engine_killer.sh' % os.environ['USER']
465 '%s-local-engine_killer.sh' % os.environ['USER']
466 )
466 )
467 f = open(self.engine_killer, 'w')
467 f = open(self.engine_killer, 'w')
468 f.writelines(self.engine_killer_template)
468 f.writelines(self.engine_killer_template)
469 f.close()
469 f.close()
470
470
471 def start(self, send_furl=False):
471 def start(self, send_furl=False):
472 dlist = []
472 dlist = []
473 for host in self.engine_hosts.keys():
473 for host in self.engine_hosts.keys():
474 count = self.engine_hosts[host]
474 count = self.engine_hosts[host]
475 d = self._start(host, count, send_furl)
475 d = self._start(host, count, send_furl)
476 dlist.append(d)
476 dlist.append(d)
477 return gatherBoth(dlist, consumeErrors=True)
477 return gatherBoth(dlist, consumeErrors=True)
478
478
479 def _start(self, hostname, count=1, send_furl=False):
479 def _start(self, hostname, count=1, send_furl=False):
480 if send_furl:
480 if send_furl:
481 d = self._scp_furl(hostname)
481 d = self._scp_furl(hostname)
482 else:
482 else:
483 d = defer.succeed(None)
483 d = defer.succeed(None)
484 d.addCallback(lambda r: self._scp_sshx(hostname))
484 d.addCallback(lambda r: self._scp_sshx(hostname))
485 d.addCallback(lambda r: self._ssh_engine(hostname, count))
485 d.addCallback(lambda r: self._ssh_engine(hostname, count))
486 return d
486 return d
487
487
488 def _scp_furl(self, hostname):
488 def _scp_furl(self, hostname):
489 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
489 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
490 cmd_list = scp_cmd.split()
490 cmd_list = scp_cmd.split()
491 cmd_list[1] = os.path.expanduser(cmd_list[1])
491 cmd_list[1] = os.path.expanduser(cmd_list[1])
492 log.msg('Copying furl file: %s' % scp_cmd)
492 log.msg('Copying furl file: %s' % scp_cmd)
493 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
493 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
494 return d
494 return d
495
495
496 def _scp_sshx(self, hostname):
496 def _scp_sshx(self, hostname):
497 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
497 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
498 self.sshx, hostname,
498 self.sshx, hostname,
499 self.temp_dir, os.environ['USER']
499 self.temp_dir, os.environ['USER']
500 )
500 )
501 print
501 print
502 log.msg("Copying sshx: %s" % scp_cmd)
502 log.msg("Copying sshx: %s" % scp_cmd)
503 sshx_scp = scp_cmd.split()
503 sshx_scp = scp_cmd.split()
504 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
504 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
505 return d
505 return d
506
506
507 def _ssh_engine(self, hostname, count):
507 def _ssh_engine(self, hostname, count):
508 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
508 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
509 hostname, self.temp_dir,
509 hostname, self.temp_dir,
510 os.environ['USER'], self.engine_command
510 os.environ['USER'], self.engine_command
511 )
511 )
512 cmds = exec_engine.split()
512 cmds = exec_engine.split()
513 dlist = []
513 dlist = []
514 log.msg("about to start engines...")
514 log.msg("about to start engines...")
515 for i in range(count):
515 for i in range(count):
516 log.msg('Starting engines: %s' % exec_engine)
516 log.msg('Starting engines: %s' % exec_engine)
517 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
517 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
518 dlist.append(d)
518 dlist.append(d)
519 return gatherBoth(dlist, consumeErrors=True)
519 return gatherBoth(dlist, consumeErrors=True)
520
520
521 def kill(self):
521 def kill(self):
522 dlist = []
522 dlist = []
523 for host in self.engine_hosts.keys():
523 for host in self.engine_hosts.keys():
524 d = self._killall(host)
524 d = self._killall(host)
525 dlist.append(d)
525 dlist.append(d)
526 return gatherBoth(dlist, consumeErrors=True)
526 return gatherBoth(dlist, consumeErrors=True)
527
527
528 def _killall(self, hostname):
528 def _killall(self, hostname):
529 d = self._scp_engine_killer(hostname)
529 d = self._scp_engine_killer(hostname)
530 d.addCallback(lambda r: self._ssh_kill(hostname))
530 d.addCallback(lambda r: self._ssh_kill(hostname))
531 # d.addErrback(self._exec_err)
531 # d.addErrback(self._exec_err)
532 return d
532 return d
533
533
534 def _scp_engine_killer(self, hostname):
534 def _scp_engine_killer(self, hostname):
535 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
535 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
536 self.engine_killer,
536 self.engine_killer,
537 hostname,
537 hostname,
538 self.temp_dir,
538 self.temp_dir,
539 os.environ['USER']
539 os.environ['USER']
540 )
540 )
541 cmds = scp_cmd.split()
541 cmds = scp_cmd.split()
542 log.msg('Copying engine_killer: %s' % scp_cmd)
542 log.msg('Copying engine_killer: %s' % scp_cmd)
543 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
543 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
544 return d
544 return d
545
545
546 def _ssh_kill(self, hostname):
546 def _ssh_kill(self, hostname):
547 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
547 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
548 hostname,
548 hostname,
549 self.temp_dir,
549 self.temp_dir,
550 os.environ['USER']
550 os.environ['USER']
551 )
551 )
552 log.msg('Killing engine: %s' % kill_cmd)
552 log.msg('Killing engine: %s' % kill_cmd)
553 kill_cmd = kill_cmd.split()
553 kill_cmd = kill_cmd.split()
554 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
554 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
555 return d
555 return d
556
556
557 def _exec_err(self, r):
557 def _exec_err(self, r):
558 log.msg(r)
558 log.msg(r)
559
559
560 #-----------------------------------------------------------------------------
560 #-----------------------------------------------------------------------------
561 # Main functions for the different types of clusters
561 # Main functions for the different types of clusters
562 #-----------------------------------------------------------------------------
562 #-----------------------------------------------------------------------------
563
563
564 # TODO:
564 # TODO:
565 # The logic in these codes should be moved into classes like LocalCluster
565 # The logic in these codes should be moved into classes like LocalCluster
566 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
566 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
567 # The main functions should then just parse the command line arguments, create
567 # The main functions should then just parse the command line arguments, create
568 # the appropriate class and call a 'start' method.
568 # the appropriate class and call a 'start' method.
569
569
570
570
571 def check_security(args, cont_args):
571 def check_security(args, cont_args):
572 """Check to see if we should run with SSL support."""
572 """Check to see if we should run with SSL support."""
573 if (not args.x or not args.y) and not have_crypto:
573 if (not args.x or not args.y) and not have_crypto:
574 log.err("""
574 log.err("""
575 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
575 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
576 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
576 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
577 reactor.stop()
577 reactor.stop()
578 return False
578 return False
579 if args.x:
579 if args.x:
580 cont_args.append('-x')
580 cont_args.append('-x')
581 if args.y:
581 if args.y:
582 cont_args.append('-y')
582 cont_args.append('-y')
583 return True
583 return True
584
584
585
585
586 def check_reuse(args, cont_args):
586 def check_reuse(args, cont_args):
587 """Check to see if we should try to resuse FURL files."""
587 """Check to see if we should try to resuse FURL files."""
588 if args.r:
588 if args.r:
589 cont_args.append('-r')
589 cont_args.append('-r')
590 if args.client_port == 0 or args.engine_port == 0:
590 if args.client_port == 0 or args.engine_port == 0:
591 log.err("""
591 log.err("""
592 To reuse FURL files, you must also set the client and engine ports using
592 To reuse FURL files, you must also set the client and engine ports using
593 the --client-port and --engine-port options.""")
593 the --client-port and --engine-port options.""")
594 reactor.stop()
594 reactor.stop()
595 return False
595 return False
596 cont_args.append('--client-port=%i' % args.client_port)
596 cont_args.append('--client-port=%i' % args.client_port)
597 cont_args.append('--engine-port=%i' % args.engine_port)
597 cont_args.append('--engine-port=%i' % args.engine_port)
598 return True
598 return True
599
599
600
600
601 def _err_and_stop(f):
601 def _err_and_stop(f):
602 """Errback to log a failure and halt the reactor on a fatal error."""
602 """Errback to log a failure and halt the reactor on a fatal error."""
603 log.err(f)
603 log.err(f)
604 reactor.stop()
604 reactor.stop()
605
605
606
606
607 def _delay_start(cont_pid, start_engines, furl_file, reuse):
607 def _delay_start(cont_pid, start_engines, furl_file, reuse):
608 """Wait for controller to create FURL files and the start the engines."""
608 """Wait for controller to create FURL files and the start the engines."""
609 if not reuse:
609 if not reuse:
610 if os.path.isfile(furl_file):
610 if os.path.isfile(furl_file):
611 os.unlink(furl_file)
611 os.unlink(furl_file)
612 log.msg('Waiting for controller to finish starting...')
612 log.msg('Waiting for controller to finish starting...')
613 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
613 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
614 d.addCallback(lambda _: log.msg('Controller started'))
614 d.addCallback(lambda _: log.msg('Controller started'))
615 d.addCallback(lambda _: start_engines(cont_pid))
615 d.addCallback(lambda _: start_engines(cont_pid))
616 return d
616 return d
617
617
618
618
619 def main_local(args):
619 def main_local(args):
620 cont_args = []
620 cont_args = []
621 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
621 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
622
622
623 # Check security settings before proceeding
623 # Check security settings before proceeding
624 if not check_security(args, cont_args):
624 if not check_security(args, cont_args):
625 return
625 return
626
626
627 # See if we are reusing FURL files
627 # See if we are reusing FURL files
628 if not check_reuse(args, cont_args):
628 if not check_reuse(args, cont_args):
629 return
629 return
630
630
631 cl = ControllerLauncher(extra_args=cont_args)
631 cl = ControllerLauncher(extra_args=cont_args)
632 dstart = cl.start()
632 dstart = cl.start()
633 def start_engines(cont_pid):
633 def start_engines(cont_pid):
634 engine_args = []
634 engine_args = []
635 engine_args.append('--logfile=%s' % \
635 engine_args.append('--logfile=%s' % \
636 pjoin(args.logdir,'ipengine%s-' % cont_pid))
636 pjoin(args.logdir,'ipengine%s-' % cont_pid))
637 eset = LocalEngineSet(extra_args=engine_args)
637 eset = LocalEngineSet(extra_args=engine_args)
638 def shutdown(signum, frame):
638 def shutdown(signum, frame):
639 log.msg('Stopping local cluster')
639 log.msg('Stopping local cluster')
640 # We are still playing with the times here, but these seem
640 # We are still playing with the times here, but these seem
641 # to be reliable in allowing everything to exit cleanly.
641 # to be reliable in allowing everything to exit cleanly.
642 eset.interrupt_then_kill(0.5)
642 eset.interrupt_then_kill(0.5)
643 cl.interrupt_then_kill(0.5)
643 cl.interrupt_then_kill(0.5)
644 reactor.callLater(1.0, reactor.stop)
644 reactor.callLater(1.0, reactor.stop)
645 signal.signal(signal.SIGINT,shutdown)
645 signal.signal(signal.SIGINT,shutdown)
646 d = eset.start(args.n)
646 d = eset.start(args.n)
647 return d
647 return d
648 config = kernel_config_manager.get_config_obj()
648 config = kernel_config_manager.get_config_obj()
649 furl_file = config['controller']['engine_furl_file']
649 furl_file = config['controller']['engine_furl_file']
650 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
650 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
651 dstart.addErrback(_err_and_stop)
651 dstart.addErrback(_err_and_stop)
652
652
653
653
654 def main_mpi(args):
654 def main_mpi(args):
655 cont_args = []
655 cont_args = []
656 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
656 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
657
657
658 # Check security settings before proceeding
658 # Check security settings before proceeding
659 if not check_security(args, cont_args):
659 if not check_security(args, cont_args):
660 return
660 return
661
661
662 # See if we are reusing FURL files
662 # See if we are reusing FURL files
663 if not check_reuse(args, cont_args):
663 if not check_reuse(args, cont_args):
664 return
664 return
665
665
666 cl = ControllerLauncher(extra_args=cont_args)
666 cl = ControllerLauncher(extra_args=cont_args)
667 dstart = cl.start()
667 dstart = cl.start()
668 def start_engines(cont_pid):
668 def start_engines(cont_pid):
669 raw_args = [args.cmd]
669 raw_args = [args.cmd]
670 raw_args.extend(['-n',str(args.n)])
670 raw_args.extend(['-n',str(args.n)])
671 raw_args.append('ipengine')
671 raw_args.append('ipengine')
672 raw_args.append('-l')
672 raw_args.append('-l')
673 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
673 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
674 if args.mpi:
674 if args.mpi:
675 raw_args.append('--mpi=%s' % args.mpi)
675 raw_args.append('--mpi=%s' % args.mpi)
676 eset = ProcessLauncher(raw_args)
676 eset = ProcessLauncher(raw_args)
677 def shutdown(signum, frame):
677 def shutdown(signum, frame):
678 log.msg('Stopping local cluster')
678 log.msg('Stopping local cluster')
679 # We are still playing with the times here, but these seem
679 # We are still playing with the times here, but these seem
680 # to be reliable in allowing everything to exit cleanly.
680 # to be reliable in allowing everything to exit cleanly.
681 eset.interrupt_then_kill(1.0)
681 eset.interrupt_then_kill(1.0)
682 cl.interrupt_then_kill(1.0)
682 cl.interrupt_then_kill(1.0)
683 reactor.callLater(2.0, reactor.stop)
683 reactor.callLater(2.0, reactor.stop)
684 signal.signal(signal.SIGINT,shutdown)
684 signal.signal(signal.SIGINT,shutdown)
685 d = eset.start()
685 d = eset.start()
686 return d
686 return d
687 config = kernel_config_manager.get_config_obj()
687 config = kernel_config_manager.get_config_obj()
688 furl_file = config['controller']['engine_furl_file']
688 furl_file = config['controller']['engine_furl_file']
689 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
689 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
690 dstart.addErrback(_err_and_stop)
690 dstart.addErrback(_err_and_stop)
691
691
692
692
693 def main_pbs(args):
693 def main_pbs(args):
694 cont_args = []
694 cont_args = []
695 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
695 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
696
696
697 # Check security settings before proceeding
697 # Check security settings before proceeding
698 if not check_security(args, cont_args):
698 if not check_security(args, cont_args):
699 return
699 return
700
700
701 # See if we are reusing FURL files
701 # See if we are reusing FURL files
702 if not check_reuse(args, cont_args):
702 if not check_reuse(args, cont_args):
703 return
703 return
704
704
705 if args.pbsscript and not os.path.isfile(args.pbsscript):
705 if args.pbsscript and not os.path.isfile(args.pbsscript):
706 log.err('PBS script does not exist: %s' % args.pbsscript)
706 log.err('PBS script does not exist: %s' % args.pbsscript)
707 return
707 return
708
708
709 cl = ControllerLauncher(extra_args=cont_args)
709 cl = ControllerLauncher(extra_args=cont_args)
710 dstart = cl.start()
710 dstart = cl.start()
711 def start_engines(r):
711 def start_engines(r):
712 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
712 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
713 def shutdown(signum, frame):
713 def shutdown(signum, frame):
714 log.msg('Stopping PBS cluster')
714 log.msg('Stopping PBS cluster')
715 d = pbs_set.kill()
715 d = pbs_set.kill()
716 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
716 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
717 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
717 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
718 signal.signal(signal.SIGINT,shutdown)
718 signal.signal(signal.SIGINT,shutdown)
719 d = pbs_set.start(args.n)
719 d = pbs_set.start(args.n)
720 return d
720 return d
721 config = kernel_config_manager.get_config_obj()
721 config = kernel_config_manager.get_config_obj()
722 furl_file = config['controller']['engine_furl_file']
722 furl_file = config['controller']['engine_furl_file']
723 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
723 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
724 dstart.addErrback(_err_and_stop)
724 dstart.addErrback(_err_and_stop)
725
725
726 def main_sge(args):
726 def main_sge(args):
727 cont_args = []
727 cont_args = []
728 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
728 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
729
729
730 # Check security settings before proceeding
730 # Check security settings before proceeding
731 if not check_security(args, cont_args):
731 if not check_security(args, cont_args):
732 return
732 return
733
733
734 # See if we are reusing FURL files
734 # See if we are reusing FURL files
735 if not check_reuse(args, cont_args):
735 if not check_reuse(args, cont_args):
736 return
736 return
737
737
738 if args.sgescript and not os.path.isfile(args.sgescript):
738 if args.sgescript and not os.path.isfile(args.sgescript):
739 log.err('SGE script does not exist: %s' % args.sgescript)
739 log.err('SGE script does not exist: %s' % args.sgescript)
740 return
740 return
741
741
742 cl = ControllerLauncher(extra_args=cont_args)
742 cl = ControllerLauncher(extra_args=cont_args)
743 dstart = cl.start()
743 dstart = cl.start()
744 def start_engines(r):
744 def start_engines(r):
745 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
745 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
746 def shutdown(signum, frame):
746 def shutdown(signum, frame):
747 log.msg('Stopping sge cluster')
747 log.msg('Stopping sge cluster')
748 d = sge_set.kill()
748 d = sge_set.kill()
749 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
749 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
750 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
750 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
751 signal.signal(signal.SIGINT,shutdown)
751 signal.signal(signal.SIGINT,shutdown)
752 d = sge_set.start(args.n)
752 d = sge_set.start(args.n)
753 return d
753 return d
754 config = kernel_config_manager.get_config_obj()
754 config = kernel_config_manager.get_config_obj()
755 furl_file = config['controller']['engine_furl_file']
755 furl_file = config['controller']['engine_furl_file']
756 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
756 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
757 dstart.addErrback(_err_and_stop)
757 dstart.addErrback(_err_and_stop)
758
758
759 def main_lsf(args):
759 def main_lsf(args):
760 cont_args = []
760 cont_args = []
761 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
761 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
762
762
763 # Check security settings before proceeding
763 # Check security settings before proceeding
764 if not check_security(args, cont_args):
764 if not check_security(args, cont_args):
765 return
765 return
766
766
767 # See if we are reusing FURL files
767 # See if we are reusing FURL files
768 if not check_reuse(args, cont_args):
768 if not check_reuse(args, cont_args):
769 return
769 return
770
770
771 if args.lsfscript and not os.path.isfile(args.lsfscript):
771 if args.lsfscript and not os.path.isfile(args.lsfscript):
772 log.err('LSF script does not exist: %s' % args.lsfscript)
772 log.err('LSF script does not exist: %s' % args.lsfscript)
773 return
773 return
774
774
775 cl = ControllerLauncher(extra_args=cont_args)
775 cl = ControllerLauncher(extra_args=cont_args)
776 dstart = cl.start()
776 dstart = cl.start()
777 def start_engines(r):
777 def start_engines(r):
778 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
778 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
779 def shutdown(signum, frame):
779 def shutdown(signum, frame):
780 log.msg('Stopping LSF cluster')
780 log.msg('Stopping LSF cluster')
781 d = lsf_set.kill()
781 d = lsf_set.kill()
782 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
782 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
783 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
783 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
784 signal.signal(signal.SIGINT,shutdown)
784 signal.signal(signal.SIGINT,shutdown)
785 d = lsf_set.start(args.n)
785 d = lsf_set.start(args.n)
786 return d
786 return d
787 config = kernel_config_manager.get_config_obj()
787 config = kernel_config_manager.get_config_obj()
788 furl_file = config['controller']['engine_furl_file']
788 furl_file = config['controller']['engine_furl_file']
789 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
789 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
790 dstart.addErrback(_err_and_stop)
790 dstart.addErrback(_err_and_stop)
791
791
792
792
793 def main_ssh(args):
793 def main_ssh(args):
794 """Start a controller on localhost and engines using ssh.
794 """Start a controller on localhost and engines using ssh.
795
795
796 Your clusterfile should look like::
796 Your clusterfile should look like::
797
797
798 send_furl = False # True, if you want
798 send_furl = False # True, if you want
799 engines = {
799 engines = {
800 'engine_host1' : engine_count,
800 'engine_host1' : engine_count,
801 'engine_host2' : engine_count2
801 'engine_host2' : engine_count2
802 }
802 }
803 """
803 """
804 clusterfile = {}
804 clusterfile = {}
805 execfile(args.clusterfile, clusterfile)
805 execfile(args.clusterfile, clusterfile)
806 if not clusterfile.has_key('send_furl'):
806 if not clusterfile.has_key('send_furl'):
807 clusterfile['send_furl'] = False
807 clusterfile['send_furl'] = False
808
808
809 cont_args = []
809 cont_args = []
810 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
810 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
811
811
812 # Check security settings before proceeding
812 # Check security settings before proceeding
813 if not check_security(args, cont_args):
813 if not check_security(args, cont_args):
814 return
814 return
815
815
816 # See if we are reusing FURL files
816 # See if we are reusing FURL files
817 if not check_reuse(args, cont_args):
817 if not check_reuse(args, cont_args):
818 return
818 return
819
819
820 cl = ControllerLauncher(extra_args=cont_args)
820 cl = ControllerLauncher(extra_args=cont_args)
821 dstart = cl.start()
821 dstart = cl.start()
822 def start_engines(cont_pid):
822 def start_engines(cont_pid):
823 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx,
823 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx,
824 copyenvs=args.copyenvs)
824 copyenvs=args.copyenvs)
825 def shutdown(signum, frame):
825 def shutdown(signum, frame):
826 d = ssh_set.kill()
826 d = ssh_set.kill()
827 cl.interrupt_then_kill(1.0)
827 cl.interrupt_then_kill(1.0)
828 reactor.callLater(2.0, reactor.stop)
828 reactor.callLater(2.0, reactor.stop)
829 signal.signal(signal.SIGINT,shutdown)
829 signal.signal(signal.SIGINT,shutdown)
830 d = ssh_set.start(clusterfile['send_furl'])
830 d = ssh_set.start(clusterfile['send_furl'])
831 return d
831 return d
832 config = kernel_config_manager.get_config_obj()
832 config = kernel_config_manager.get_config_obj()
833 furl_file = config['controller']['engine_furl_file']
833 furl_file = config['controller']['engine_furl_file']
834 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
834 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
835 dstart.addErrback(_err_and_stop)
835 dstart.addErrback(_err_and_stop)
836
836
837
837
838 def get_args():
838 def get_args():
839 base_parser = argparse.ArgumentParser(add_help=False)
839 base_parser = argparse.ArgumentParser(add_help=False)
840 base_parser.add_argument(
840 base_parser.add_argument(
841 '-r',
841 '-r',
842 action='store_true',
842 action='store_true',
843 dest='r',
843 dest='r',
844 help='try to reuse FURL files. Use with --client-port and --engine-port'
844 help='try to reuse FURL files. Use with --client-port and --engine-port'
845 )
845 )
846 base_parser.add_argument(
846 base_parser.add_argument(
847 '--client-port',
847 '--client-port',
848 type=int,
848 type=int,
849 dest='client_port',
849 dest='client_port',
850 help='the port the controller will listen on for client connections',
850 help='the port the controller will listen on for client connections',
851 default=0
851 default=0
852 )
852 )
853 base_parser.add_argument(
853 base_parser.add_argument(
854 '--engine-port',
854 '--engine-port',
855 type=int,
855 type=int,
856 dest='engine_port',
856 dest='engine_port',
857 help='the port the controller will listen on for engine connections',
857 help='the port the controller will listen on for engine connections',
858 default=0
858 default=0
859 )
859 )
860 base_parser.add_argument(
860 base_parser.add_argument(
861 '-x',
861 '-x',
862 action='store_true',
862 action='store_true',
863 dest='x',
863 dest='x',
864 help='turn off client security'
864 help='turn off client security'
865 )
865 )
866 base_parser.add_argument(
866 base_parser.add_argument(
867 '-y',
867 '-y',
868 action='store_true',
868 action='store_true',
869 dest='y',
869 dest='y',
870 help='turn off engine security'
870 help='turn off engine security'
871 )
871 )
872 base_parser.add_argument(
872 base_parser.add_argument(
873 "--logdir",
873 "--logdir",
874 type=str,
874 type=str,
875 dest="logdir",
875 dest="logdir",
876 help="directory to put log files (default=$IPYTHONDIR/log)",
876 help="directory to put log files (default=$IPYTHONDIR/log)",
877 default=pjoin(get_ipython_dir(),'log')
877 default=pjoin(get_ipython_dir(),'log')
878 )
878 )
879 base_parser.add_argument(
879 base_parser.add_argument(
880 "-n",
880 "-n",
881 "--num",
881 "--num",
882 type=int,
882 type=int,
883 dest="n",
883 dest="n",
884 default=2,
884 default=2,
885 help="the number of engines to start"
885 help="the number of engines to start"
886 )
886 )
887
887
888 parser = argparse.ArgumentParser(
888 parser = argparse.ArgumentParser(
889 description='IPython cluster startup. This starts a controller and\
889 description='IPython cluster startup. This starts a controller and\
890 engines using various approaches. Use the IPYTHONDIR environment\
890 engines using various approaches. Use the IPYTHONDIR environment\
891 variable to change your IPython directory from the default of\
891 variable to change your IPython directory from the default of\
892 .ipython or _ipython. The log and security subdirectories of your\
892 .ipython or _ipython. The log and security subdirectories of your\
893 IPython directory will be used by this script for log files and\
893 IPython directory will be used by this script for log files and\
894 security files.'
894 security files.'
895 )
895 )
896 subparsers = parser.add_subparsers(
896 subparsers = parser.add_subparsers(
897 help='available cluster types. For help, do "ipcluster TYPE --help"')
897 help='available cluster types. For help, do "ipcluster TYPE --help"')
898
898
899 parser_local = subparsers.add_parser(
899 parser_local = subparsers.add_parser(
900 'local',
900 'local',
901 help='run a local cluster',
901 help='run a local cluster',
902 parents=[base_parser]
902 parents=[base_parser]
903 )
903 )
904 parser_local.set_defaults(func=main_local)
904 parser_local.set_defaults(func=main_local)
905
905
906 parser_mpirun = subparsers.add_parser(
906 parser_mpirun = subparsers.add_parser(
907 'mpirun',
907 'mpirun',
908 help='run a cluster using mpirun (mpiexec also works)',
908 help='run a cluster using mpirun (mpiexec also works)',
909 parents=[base_parser]
909 parents=[base_parser]
910 )
910 )
911 parser_mpirun.add_argument(
911 parser_mpirun.add_argument(
912 "--mpi",
912 "--mpi",
913 type=str,
913 type=str,
914 dest="mpi", # Don't put a default here to allow no MPI support
914 dest="mpi", # Don't put a default here to allow no MPI support
915 help="how to call MPI_Init (default=mpi4py)"
915 help="how to call MPI_Init (default=mpi4py)"
916 )
916 )
917 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
917 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
918
918
919 parser_mpiexec = subparsers.add_parser(
919 parser_mpiexec = subparsers.add_parser(
920 'mpiexec',
920 'mpiexec',
921 help='run a cluster using mpiexec (mpirun also works)',
921 help='run a cluster using mpiexec (mpirun also works)',
922 parents=[base_parser]
922 parents=[base_parser]
923 )
923 )
924 parser_mpiexec.add_argument(
924 parser_mpiexec.add_argument(
925 "--mpi",
925 "--mpi",
926 type=str,
926 type=str,
927 dest="mpi", # Don't put a default here to allow no MPI support
927 dest="mpi", # Don't put a default here to allow no MPI support
928 help="how to call MPI_Init (default=mpi4py)"
928 help="how to call MPI_Init (default=mpi4py)"
929 )
929 )
930 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
930 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
931
931
932 parser_pbs = subparsers.add_parser(
932 parser_pbs = subparsers.add_parser(
933 'pbs',
933 'pbs',
934 help='run a pbs cluster',
934 help='run a pbs cluster',
935 parents=[base_parser]
935 parents=[base_parser]
936 )
936 )
937 parser_pbs.add_argument(
937 parser_pbs.add_argument(
938 '-s',
938 '-s',
939 '--pbs-script',
939 '--pbs-script',
940 type=str,
940 type=str,
941 dest='pbsscript',
941 dest='pbsscript',
942 help='PBS script template',
942 help='PBS script template',
943 default=''
943 default=''
944 )
944 )
945 parser_pbs.add_argument(
945 parser_pbs.add_argument(
946 '-q',
946 '-q',
947 '--queue',
947 '--queue',
948 type=str,
948 type=str,
949 dest='pbsqueue',
949 dest='pbsqueue',
950 help='PBS queue to use when starting the engines',
950 help='PBS queue to use when starting the engines',
951 default=None,
951 default=None,
952 )
952 )
953 parser_pbs.set_defaults(func=main_pbs)
953 parser_pbs.set_defaults(func=main_pbs)
954
954
955 parser_sge = subparsers.add_parser(
955 parser_sge = subparsers.add_parser(
956 'sge',
956 'sge',
957 help='run an sge cluster',
957 help='run an sge cluster',
958 parents=[base_parser]
958 parents=[base_parser]
959 )
959 )
960 parser_sge.add_argument(
960 parser_sge.add_argument(
961 '-s',
961 '-s',
962 '--sge-script',
962 '--sge-script',
963 type=str,
963 type=str,
964 dest='sgescript',
964 dest='sgescript',
965 help='SGE script template',
965 help='SGE script template',
966 default='' # SGEEngineSet will create one if not specified
966 default='' # SGEEngineSet will create one if not specified
967 )
967 )
968 parser_sge.add_argument(
968 parser_sge.add_argument(
969 '-q',
969 '-q',
970 '--queue',
970 '--queue',
971 type=str,
971 type=str,
972 dest='sgequeue',
972 dest='sgequeue',
973 help='SGE queue to use when starting the engines',
973 help='SGE queue to use when starting the engines',
974 default=None,
974 default=None,
975 )
975 )
976 parser_sge.set_defaults(func=main_sge)
976 parser_sge.set_defaults(func=main_sge)
977
977
978 parser_lsf = subparsers.add_parser(
978 parser_lsf = subparsers.add_parser(
979 'lsf',
979 'lsf',
980 help='run an lsf cluster',
980 help='run an lsf cluster',
981 parents=[base_parser]
981 parents=[base_parser]
982 )
982 )
983
983
984 parser_lsf.add_argument(
984 parser_lsf.add_argument(
985 '-s',
985 '-s',
986 '--lsf-script',
986 '--lsf-script',
987 type=str,
987 type=str,
988 dest='lsfscript',
988 dest='lsfscript',
989 help='LSF script template',
989 help='LSF script template',
990 default='' # LSFEngineSet will create one if not specified
990 default='' # LSFEngineSet will create one if not specified
991 )
991 )
992
992
993 parser_lsf.add_argument(
993 parser_lsf.add_argument(
994 '-q',
994 '-q',
995 '--queue',
995 '--queue',
996 type=str,
996 type=str,
997 dest='lsfqueue',
997 dest='lsfqueue',
998 help='LSF queue to use when starting the engines',
998 help='LSF queue to use when starting the engines',
999 default=None,
999 default=None,
1000 )
1000 )
1001 parser_lsf.set_defaults(func=main_lsf)
1001 parser_lsf.set_defaults(func=main_lsf)
1002
1002
1003 parser_ssh = subparsers.add_parser(
1003 parser_ssh = subparsers.add_parser(
1004 'ssh',
1004 'ssh',
1005 help='run a cluster using ssh, should have ssh-keys setup',
1005 help='run a cluster using ssh, should have ssh-keys setup',
1006 parents=[base_parser]
1006 parents=[base_parser]
1007 )
1007 )
1008 parser_ssh.add_argument(
1008 parser_ssh.add_argument(
1009 '-e',
1009 '-e',
1010 '--copyenvs',
1010 '--copyenvs',
1011 action='store_true',
1011 action='store_true',
1012 dest='copyenvs',
1012 dest='copyenvs',
1013 help='Copy current shell environment to remote location',
1013 help='Copy current shell environment to remote location',
1014 default=False,
1014 default=False,
1015 )
1015 )
1016 parser_ssh.add_argument(
1016 parser_ssh.add_argument(
1017 '--clusterfile',
1017 '--clusterfile',
1018 type=str,
1018 type=str,
1019 dest='clusterfile',
1019 dest='clusterfile',
1020 help='python file describing the cluster',
1020 help='python file describing the cluster',
1021 default='clusterfile.py',
1021 default='clusterfile.py',
1022 )
1022 )
1023 parser_ssh.add_argument(
1023 parser_ssh.add_argument(
1024 '--sshx',
1024 '--sshx',
1025 type=str,
1025 type=str,
1026 dest='sshx',
1026 dest='sshx',
1027 help='sshx launcher helper'
1027 help='sshx launcher helper'
1028 )
1028 )
1029 parser_ssh.set_defaults(func=main_ssh)
1029 parser_ssh.set_defaults(func=main_ssh)
1030
1030
1031 args = parser.parse_args()
1031 args = parser.parse_args()
1032 return args
1032 return args
1033
1033
1034 def main():
1034 def main():
1035 args = get_args()
1035 args = get_args()
1036 reactor.callWhenRunning(args.func, args)
1036 reactor.callWhenRunning(args.func, args)
1037 log.startLogging(sys.stdout)
1037 log.startLogging(sys.stdout)
1038 reactor.run()
1038 reactor.run()
1039
1039
1040 if __name__ == '__main__':
1040 if __name__ == '__main__':
1041 main()
1041 main()
General Comments 0
You need to be logged in to leave comments. Login now