##// END OF EJS Templates
ENH: ssh now allows sending all current environment args to remote ipengine
Satrajit Ghosh -
Show More
@@ -1,1018 +1,1033 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import stat
21 import stat
22 import tempfile
22 import tempfile
23 pjoin = os.path.join
23 pjoin = os.path.join
24
24
25 from twisted.internet import reactor, defer
25 from twisted.internet import reactor, defer
26 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.protocol import ProcessProtocol
27 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.error import ProcessDone, ProcessTerminated
28 from twisted.internet.utils import getProcessOutput
28 from twisted.internet.utils import getProcessOutput
29 from twisted.python import failure, log
29 from twisted.python import failure, log
30
30
31 from IPython.external import argparse
31 from IPython.external import argparse
32 from IPython.external import Itpl
32 from IPython.external import Itpl
33 from IPython.genutils import (
33 from IPython.genutils import (
34 get_ipython_dir,
34 get_ipython_dir,
35 get_log_dir,
35 get_log_dir,
36 get_security_dir,
36 get_security_dir,
37 num_cpus
37 num_cpus
38 )
38 )
39 from IPython.kernel.fcutil import have_crypto
39 from IPython.kernel.fcutil import have_crypto
40
40
41 # Create various ipython directories if they don't exist.
41 # Create various ipython directories if they don't exist.
42 # This must be done before IPython.kernel.config is imported.
42 # This must be done before IPython.kernel.config is imported.
43 from IPython.iplib import user_setup
43 from IPython.iplib import user_setup
44 if os.name == 'posix':
44 if os.name == 'posix':
45 rc_suffix = ''
45 rc_suffix = ''
46 else:
46 else:
47 rc_suffix = '.ini'
47 rc_suffix = '.ini'
48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
49 get_log_dir()
49 get_log_dir()
50 get_security_dir()
50 get_security_dir()
51
51
52 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.config import config_manager as kernel_config_manager
53 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.error import SecurityError, FileTimeoutError
54 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.fcutil import have_crypto
55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
56 from IPython.kernel.util import printer
56 from IPython.kernel.util import printer
57
57
58 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
59 # General process handling code
59 # General process handling code
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61
61
62
62
63 class ProcessStateError(Exception):
63 class ProcessStateError(Exception):
64 pass
64 pass
65
65
66 class UnknownStatus(Exception):
66 class UnknownStatus(Exception):
67 pass
67 pass
68
68
69 class LauncherProcessProtocol(ProcessProtocol):
69 class LauncherProcessProtocol(ProcessProtocol):
70 """
70 """
71 A ProcessProtocol to go with the ProcessLauncher.
71 A ProcessProtocol to go with the ProcessLauncher.
72 """
72 """
73 def __init__(self, process_launcher):
73 def __init__(self, process_launcher):
74 self.process_launcher = process_launcher
74 self.process_launcher = process_launcher
75
75
76 def connectionMade(self):
76 def connectionMade(self):
77 self.process_launcher.fire_start_deferred(self.transport.pid)
77 self.process_launcher.fire_start_deferred(self.transport.pid)
78
78
79 def processEnded(self, status):
79 def processEnded(self, status):
80 value = status.value
80 value = status.value
81 if isinstance(value, ProcessDone):
81 if isinstance(value, ProcessDone):
82 self.process_launcher.fire_stop_deferred(0)
82 self.process_launcher.fire_stop_deferred(0)
83 elif isinstance(value, ProcessTerminated):
83 elif isinstance(value, ProcessTerminated):
84 self.process_launcher.fire_stop_deferred(
84 self.process_launcher.fire_stop_deferred(
85 {'exit_code':value.exitCode,
85 {'exit_code':value.exitCode,
86 'signal':value.signal,
86 'signal':value.signal,
87 'status':value.status
87 'status':value.status
88 }
88 }
89 )
89 )
90 else:
90 else:
91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
92
92
93 def outReceived(self, data):
93 def outReceived(self, data):
94 log.msg(data)
94 log.msg(data)
95
95
96 def errReceived(self, data):
96 def errReceived(self, data):
97 log.err(data)
97 log.err(data)
98
98
99 class ProcessLauncher(object):
99 class ProcessLauncher(object):
100 """
100 """
101 Start and stop an external process in an asynchronous manner.
101 Start and stop an external process in an asynchronous manner.
102
102
103 Currently this uses deferreds to notify other parties of process state
103 Currently this uses deferreds to notify other parties of process state
104 changes. This is an awkward design and should be moved to using
104 changes. This is an awkward design and should be moved to using
105 a formal NotificationCenter.
105 a formal NotificationCenter.
106 """
106 """
107 def __init__(self, cmd_and_args):
107 def __init__(self, cmd_and_args):
108 self.cmd = cmd_and_args[0]
108 self.cmd = cmd_and_args[0]
109 self.args = cmd_and_args
109 self.args = cmd_and_args
110 self._reset()
110 self._reset()
111
111
112 def _reset(self):
112 def _reset(self):
113 self.process_protocol = None
113 self.process_protocol = None
114 self.pid = None
114 self.pid = None
115 self.start_deferred = None
115 self.start_deferred = None
116 self.stop_deferreds = []
116 self.stop_deferreds = []
117 self.state = 'before' # before, running, or after
117 self.state = 'before' # before, running, or after
118
118
119 @property
119 @property
120 def running(self):
120 def running(self):
121 if self.state == 'running':
121 if self.state == 'running':
122 return True
122 return True
123 else:
123 else:
124 return False
124 return False
125
125
126 def fire_start_deferred(self, pid):
126 def fire_start_deferred(self, pid):
127 self.pid = pid
127 self.pid = pid
128 self.state = 'running'
128 self.state = 'running'
129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
130 self.start_deferred.callback(pid)
130 self.start_deferred.callback(pid)
131
131
132 def start(self):
132 def start(self):
133 if self.state == 'before':
133 if self.state == 'before':
134 self.process_protocol = LauncherProcessProtocol(self)
134 self.process_protocol = LauncherProcessProtocol(self)
135 self.start_deferred = defer.Deferred()
135 self.start_deferred = defer.Deferred()
136 self.process_transport = reactor.spawnProcess(
136 self.process_transport = reactor.spawnProcess(
137 self.process_protocol,
137 self.process_protocol,
138 self.cmd,
138 self.cmd,
139 self.args,
139 self.args,
140 env=os.environ
140 env=os.environ
141 )
141 )
142 return self.start_deferred
142 return self.start_deferred
143 else:
143 else:
144 s = 'the process has already been started and has state: %r' % \
144 s = 'the process has already been started and has state: %r' % \
145 self.state
145 self.state
146 return defer.fail(ProcessStateError(s))
146 return defer.fail(ProcessStateError(s))
147
147
148 def get_stop_deferred(self):
148 def get_stop_deferred(self):
149 if self.state == 'running' or self.state == 'before':
149 if self.state == 'running' or self.state == 'before':
150 d = defer.Deferred()
150 d = defer.Deferred()
151 self.stop_deferreds.append(d)
151 self.stop_deferreds.append(d)
152 return d
152 return d
153 else:
153 else:
154 s = 'this process is already complete'
154 s = 'this process is already complete'
155 return defer.fail(ProcessStateError(s))
155 return defer.fail(ProcessStateError(s))
156
156
157 def fire_stop_deferred(self, exit_code):
157 def fire_stop_deferred(self, exit_code):
158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
159 self.state = 'after'
159 self.state = 'after'
160 for d in self.stop_deferreds:
160 for d in self.stop_deferreds:
161 d.callback(exit_code)
161 d.callback(exit_code)
162
162
163 def signal(self, sig):
163 def signal(self, sig):
164 """
164 """
165 Send a signal to the process.
165 Send a signal to the process.
166
166
167 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 The argument sig can be ('KILL','INT', etc.) or any signal number.
168 """
168 """
169 if self.state == 'running':
169 if self.state == 'running':
170 self.process_transport.signalProcess(sig)
170 self.process_transport.signalProcess(sig)
171
171
172 # def __del__(self):
172 # def __del__(self):
173 # self.signal('KILL')
173 # self.signal('KILL')
174
174
175 def interrupt_then_kill(self, delay=1.0):
175 def interrupt_then_kill(self, delay=1.0):
176 self.signal('INT')
176 self.signal('INT')
177 reactor.callLater(delay, self.signal, 'KILL')
177 reactor.callLater(delay, self.signal, 'KILL')
178
178
179
179
180 #-----------------------------------------------------------------------------
180 #-----------------------------------------------------------------------------
181 # Code for launching controller and engines
181 # Code for launching controller and engines
182 #-----------------------------------------------------------------------------
182 #-----------------------------------------------------------------------------
183
183
184
184
185 class ControllerLauncher(ProcessLauncher):
185 class ControllerLauncher(ProcessLauncher):
186
186
187 def __init__(self, extra_args=None):
187 def __init__(self, extra_args=None):
188 if sys.platform == 'win32':
188 if sys.platform == 'win32':
189 # This logic is needed because the ipcontroller script doesn't
189 # This logic is needed because the ipcontroller script doesn't
190 # always get installed in the same way or in the same location.
190 # always get installed in the same way or in the same location.
191 from IPython.kernel.scripts import ipcontroller
191 from IPython.kernel.scripts import ipcontroller
192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
193 # The -u option here turns on unbuffered output, which is required
193 # The -u option here turns on unbuffered output, which is required
194 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # on Win32 to prevent wierd conflict and problems with Twisted.
195 # Also, use sys.executable to make sure we are picking up the
195 # Also, use sys.executable to make sure we are picking up the
196 # right python exe.
196 # right python exe.
197 args = [sys.executable, '-u', script_location]
197 args = [sys.executable, '-u', script_location]
198 else:
198 else:
199 args = ['ipcontroller']
199 args = ['ipcontroller']
200 self.extra_args = extra_args
200 self.extra_args = extra_args
201 if extra_args is not None:
201 if extra_args is not None:
202 args.extend(extra_args)
202 args.extend(extra_args)
203
203
204 ProcessLauncher.__init__(self, args)
204 ProcessLauncher.__init__(self, args)
205
205
206
206
207 class EngineLauncher(ProcessLauncher):
207 class EngineLauncher(ProcessLauncher):
208
208
209 def __init__(self, extra_args=None):
209 def __init__(self, extra_args=None):
210 if sys.platform == 'win32':
210 if sys.platform == 'win32':
211 # This logic is needed because the ipcontroller script doesn't
211 # This logic is needed because the ipcontroller script doesn't
212 # always get installed in the same way or in the same location.
212 # always get installed in the same way or in the same location.
213 from IPython.kernel.scripts import ipengine
213 from IPython.kernel.scripts import ipengine
214 script_location = ipengine.__file__.replace('.pyc', '.py')
214 script_location = ipengine.__file__.replace('.pyc', '.py')
215 # The -u option here turns on unbuffered output, which is required
215 # The -u option here turns on unbuffered output, which is required
216 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # on Win32 to prevent wierd conflict and problems with Twisted.
217 # Also, use sys.executable to make sure we are picking up the
217 # Also, use sys.executable to make sure we are picking up the
218 # right python exe.
218 # right python exe.
219 args = [sys.executable, '-u', script_location]
219 args = [sys.executable, '-u', script_location]
220 else:
220 else:
221 args = ['ipengine']
221 args = ['ipengine']
222 self.extra_args = extra_args
222 self.extra_args = extra_args
223 if extra_args is not None:
223 if extra_args is not None:
224 args.extend(extra_args)
224 args.extend(extra_args)
225
225
226 ProcessLauncher.__init__(self, args)
226 ProcessLauncher.__init__(self, args)
227
227
228
228
229 class LocalEngineSet(object):
229 class LocalEngineSet(object):
230
230
231 def __init__(self, extra_args=None):
231 def __init__(self, extra_args=None):
232 self.extra_args = extra_args
232 self.extra_args = extra_args
233 self.launchers = []
233 self.launchers = []
234
234
235 def start(self, n):
235 def start(self, n):
236 dlist = []
236 dlist = []
237 for i in range(n):
237 for i in range(n):
238 print "starting engine:", i
238 print "starting engine:", i
239 el = EngineLauncher(extra_args=self.extra_args)
239 el = EngineLauncher(extra_args=self.extra_args)
240 d = el.start()
240 d = el.start()
241 self.launchers.append(el)
241 self.launchers.append(el)
242 dlist.append(d)
242 dlist.append(d)
243 dfinal = gatherBoth(dlist, consumeErrors=True)
243 dfinal = gatherBoth(dlist, consumeErrors=True)
244 dfinal.addCallback(self._handle_start)
244 dfinal.addCallback(self._handle_start)
245 return dfinal
245 return dfinal
246
246
247 def _handle_start(self, r):
247 def _handle_start(self, r):
248 log.msg('Engines started with pids: %r' % r)
248 log.msg('Engines started with pids: %r' % r)
249 return r
249 return r
250
250
251 def _handle_stop(self, r):
251 def _handle_stop(self, r):
252 log.msg('Engines received signal: %r' % r)
252 log.msg('Engines received signal: %r' % r)
253 return r
253 return r
254
254
255 def signal(self, sig):
255 def signal(self, sig):
256 dlist = []
256 dlist = []
257 for el in self.launchers:
257 for el in self.launchers:
258 d = el.get_stop_deferred()
258 d = el.get_stop_deferred()
259 dlist.append(d)
259 dlist.append(d)
260 el.signal(sig)
260 el.signal(sig)
261 dfinal = gatherBoth(dlist, consumeErrors=True)
261 dfinal = gatherBoth(dlist, consumeErrors=True)
262 dfinal.addCallback(self._handle_stop)
262 dfinal.addCallback(self._handle_stop)
263 return dfinal
263 return dfinal
264
264
265 def interrupt_then_kill(self, delay=1.0):
265 def interrupt_then_kill(self, delay=1.0):
266 dlist = []
266 dlist = []
267 for el in self.launchers:
267 for el in self.launchers:
268 d = el.get_stop_deferred()
268 d = el.get_stop_deferred()
269 dlist.append(d)
269 dlist.append(d)
270 el.interrupt_then_kill(delay)
270 el.interrupt_then_kill(delay)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
272 dfinal.addCallback(self._handle_stop)
272 dfinal.addCallback(self._handle_stop)
273 return dfinal
273 return dfinal
274
274
275 class BatchEngineSet(object):
275 class BatchEngineSet(object):
276
276
277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
278 name = ''
278 name = ''
279 submit_command = ''
279 submit_command = ''
280 delete_command = ''
280 delete_command = ''
281 job_id_regexp = ''
281 job_id_regexp = ''
282 job_array_regexp = ''
282 job_array_regexp = ''
283 job_array_template = ''
283 job_array_template = ''
284 queue_regexp = ''
284 queue_regexp = ''
285 queue_template = ''
285 queue_template = ''
286 default_template = ''
286 default_template = ''
287
287
288 def __init__(self, template_file, queue, **kwargs):
288 def __init__(self, template_file, queue, **kwargs):
289 self.template_file = template_file
289 self.template_file = template_file
290 self.queue = queue
290 self.queue = queue
291
291
292 def parse_job_id(self, output):
292 def parse_job_id(self, output):
293 m = re.search(self.job_id_regexp, output)
293 m = re.search(self.job_id_regexp, output)
294 if m is not None:
294 if m is not None:
295 job_id = m.group()
295 job_id = m.group()
296 else:
296 else:
297 raise Exception("job id couldn't be determined: %s" % output)
297 raise Exception("job id couldn't be determined: %s" % output)
298 self.job_id = job_id
298 self.job_id = job_id
299 log.msg('Job started with job id: %r' % job_id)
299 log.msg('Job started with job id: %r' % job_id)
300 return job_id
300 return job_id
301
301
302 def handle_error(self, f):
302 def handle_error(self, f):
303 f.printTraceback()
303 f.printTraceback()
304 f.raiseException()
304 f.raiseException()
305
305
306 def start(self, n):
306 def start(self, n):
307 log.msg("starting %d engines" % n)
307 log.msg("starting %d engines" % n)
308 self._temp_file = tempfile.NamedTemporaryFile()
308 self._temp_file = tempfile.NamedTemporaryFile()
309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
310 if self.template_file:
310 if self.template_file:
311 log.msg("Using %s script %s" % (self.name, self.template_file))
311 log.msg("Using %s script %s" % (self.name, self.template_file))
312 contents = open(self.template_file, 'r').read()
312 contents = open(self.template_file, 'r').read()
313 new_script = contents
313 new_script = contents
314 regex = re.compile(self.job_array_regexp)
314 regex = re.compile(self.job_array_regexp)
315 if not regex.search(contents):
315 if not regex.search(contents):
316 log.msg("adding job array settings to %s script" % self.name)
316 log.msg("adding job array settings to %s script" % self.name)
317 new_script = self.job_array_template % n +'\n' + new_script
317 new_script = self.job_array_template % n +'\n' + new_script
318 print self.queue_regexp
318 print self.queue_regexp
319 regex = re.compile(self.queue_regexp)
319 regex = re.compile(self.queue_regexp)
320 print regex.search(contents)
320 print regex.search(contents)
321 if self.queue and not regex.search(contents):
321 if self.queue and not regex.search(contents):
322 log.msg("adding queue settings to %s script" % self.name)
322 log.msg("adding queue settings to %s script" % self.name)
323 new_script = self.queue_template % self.queue + '\n' + new_script
323 new_script = self.queue_template % self.queue + '\n' + new_script
324 if new_script != contents:
324 if new_script != contents:
325 self._temp_file.write(new_script)
325 self._temp_file.write(new_script)
326 self.template_file = self._temp_file.name
326 self.template_file = self._temp_file.name
327 else:
327 else:
328 default_script = self.default_template % n
328 default_script = self.default_template % n
329 if self.queue:
329 if self.queue:
330 default_script = self.queue_template % self.queue + \
330 default_script = self.queue_template % self.queue + \
331 '\n' + default_script
331 '\n' + default_script
332 log.msg("using default ipengine %s script: \n%s" %
332 log.msg("using default ipengine %s script: \n%s" %
333 (self.name, default_script))
333 (self.name, default_script))
334 self._temp_file.file.write(default_script)
334 self._temp_file.file.write(default_script)
335 self.template_file = self._temp_file.name
335 self.template_file = self._temp_file.name
336 self._temp_file.file.close()
336 self._temp_file.file.close()
337 d = getProcessOutput(self.submit_command,
337 d = getProcessOutput(self.submit_command,
338 [self.template_file],
338 [self.template_file],
339 env=os.environ)
339 env=os.environ)
340 d.addCallback(self.parse_job_id)
340 d.addCallback(self.parse_job_id)
341 d.addErrback(self.handle_error)
341 d.addErrback(self.handle_error)
342 return d
342 return d
343
343
344 def kill(self):
344 def kill(self):
345 d = getProcessOutput(self.delete_command,
345 d = getProcessOutput(self.delete_command,
346 [self.job_id],env=os.environ)
346 [self.job_id],env=os.environ)
347 return d
347 return d
348
348
349 class PBSEngineSet(BatchEngineSet):
349 class PBSEngineSet(BatchEngineSet):
350
350
351 name = 'PBS'
351 name = 'PBS'
352 submit_command = 'qsub'
352 submit_command = 'qsub'
353 delete_command = 'qdel'
353 delete_command = 'qdel'
354 job_id_regexp = '\d+'
354 job_id_regexp = '\d+'
355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
356 job_array_template = '#PBS -t 1-%d'
356 job_array_template = '#PBS -t 1-%d'
357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
358 queue_template = '#PBS -q %s'
358 queue_template = '#PBS -q %s'
359 default_template="""#!/bin/sh
359 default_template="""#!/bin/sh
360 #PBS -V
360 #PBS -V
361 #PBS -t 1-%d
361 #PBS -t 1-%d
362 #PBS -N ipengine
362 #PBS -N ipengine
363 eid=$(($PBS_ARRAYID - 1))
363 eid=$(($PBS_ARRAYID - 1))
364 ipengine --logfile=ipengine${eid}.log
364 ipengine --logfile=ipengine${eid}.log
365 """
365 """
366
366
367 class SGEEngineSet(PBSEngineSet):
367 class SGEEngineSet(PBSEngineSet):
368
368
369 name = 'SGE'
369 name = 'SGE'
370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
371 job_array_template = '#$ -t 1-%d'
371 job_array_template = '#$ -t 1-%d'
372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
373 queue_template = '#$ -q %s'
373 queue_template = '#$ -q %s'
374 default_template="""#$ -V
374 default_template="""#$ -V
375 #$ -S /bin/sh
375 #$ -S /bin/sh
376 #$ -t 1-%d
376 #$ -t 1-%d
377 #$ -N ipengine
377 #$ -N ipengine
378 eid=$(($SGE_TASK_ID - 1))
378 eid=$(($SGE_TASK_ID - 1))
379 ipengine --logfile=ipengine${eid}.log
379 ipengine --logfile=ipengine${eid}.log
380 """
380 """
381
381
382 class LSFEngineSet(PBSEngineSet):
382 class LSFEngineSet(PBSEngineSet):
383
383
384 name = 'LSF'
384 name = 'LSF'
385 submit_command = 'bsub'
385 submit_command = 'bsub'
386 delete_command = 'bkill'
386 delete_command = 'bkill'
387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
388 job_array_template = '#BSUB -J ipengine[1-%d]'
388 job_array_template = '#BSUB -J ipengine[1-%d]'
389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
390 queue_template = '#BSUB -q %s'
390 queue_template = '#BSUB -q %s'
391 default_template="""#!/bin/sh
391 default_template="""#!/bin/sh
392 #BSUB -J ipengine[1-%d]
392 #BSUB -J ipengine[1-%d]
393 eid=$(($LSB_JOBINDEX - 1))
393 eid=$(($LSB_JOBINDEX - 1))
394 ipengine --logfile=ipengine${eid}.log
394 ipengine --logfile=ipengine${eid}.log
395 """
395 """
396 bsub_wrapper="""#!/bin/sh
396 bsub_wrapper="""#!/bin/sh
397 bsub < $1
397 bsub < $1
398 """
398 """
399
399
400 def __init__(self, template_file, queue, **kwargs):
400 def __init__(self, template_file, queue, **kwargs):
401 self._bsub_wrapper = self._make_bsub_wrapper()
401 self._bsub_wrapper = self._make_bsub_wrapper()
402 self.submit_command = self._bsub_wrapper.name
402 self.submit_command = self._bsub_wrapper.name
403 PBSEngineSet.__init__(self,template_file, queue, **kwargs)
403 PBSEngineSet.__init__(self,template_file, queue, **kwargs)
404
404
405 def _make_bsub_wrapper(self):
405 def _make_bsub_wrapper(self):
406 bsub_wrapper = tempfile.NamedTemporaryFile()
406 bsub_wrapper = tempfile.NamedTemporaryFile()
407 bsub_wrapper.write(self.bsub_wrapper)
407 bsub_wrapper.write(self.bsub_wrapper)
408 bsub_wrapper.file.close()
408 bsub_wrapper.file.close()
409 os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
409 os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
410 return bsub_wrapper
410 return bsub_wrapper
411
411
412 sshx_template="""#!/bin/sh
412 sshx_template_prefix="""#!/bin/sh
413 "$@" &> /dev/null &
413 """
414 sshx_template_suffix=""""$@" &> /dev/null &
414 echo $!
415 echo $!
415 """
416 """
416
417
417 engine_killer_template="""#!/bin/sh
418 engine_killer_template="""#!/bin/sh
418 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
419 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
419 """
420 """
420
421
421 class SSHEngineSet(object):
422 class SSHEngineSet(object):
422 sshx_template=sshx_template
423 sshx_template_prefix=sshx_template_prefix
424 sshx_template_suffix=sshx_template_suffix
423 engine_killer_template=engine_killer_template
425 engine_killer_template=engine_killer_template
424
426
425 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
427 def __init__(self, engine_hosts, sshx=None, copyenvs=None, ipengine="ipengine"):
426 """Start a controller on localhost and engines using ssh.
428 """Start a controller on localhost and engines using ssh.
427
429
428 The engine_hosts argument is a dict with hostnames as keys and
430 The engine_hosts argument is a dict with hostnames as keys and
429 the number of engine (int) as values. sshx is the name of a local
431 the number of engine (int) as values. sshx is the name of a local
430 file that will be used to run remote commands. This file is used
432 file that will be used to run remote commands. This file is used
431 to setup the environment properly.
433 to setup the environment properly.
432 """
434 """
433
435
434 self.temp_dir = tempfile.gettempdir()
436 self.temp_dir = tempfile.gettempdir()
435 if sshx is not None:
437 if sshx is not None:
436 self.sshx = sshx
438 self.sshx = sshx
437 else:
439 else:
438 # Write the sshx.sh file locally from our template.
440 # Write the sshx.sh file locally from our template.
439 self.sshx = os.path.join(
441 self.sshx = os.path.join(
440 self.temp_dir,
442 self.temp_dir,
441 '%s-main-sshx.sh' % os.environ['USER']
443 '%s-main-sshx.sh' % os.environ['USER']
442 )
444 )
443 f = open(self.sshx, 'w')
445 f = open(self.sshx, 'w')
444 f.writelines(self.sshx_template)
446 f.writelines(self.sshx_template_prefix)
447 if copyenvs:
448 for key, val in os.environ.items():
449 f.writelines('export %s=%s\n'%(key,val))
450 f.writelines(self.sshx_template_suffix)
445 f.close()
451 f.close()
446 self.engine_command = ipengine
452 self.engine_command = ipengine
447 self.engine_hosts = engine_hosts
453 self.engine_hosts = engine_hosts
448 # Write the engine killer script file locally from our template.
454 # Write the engine killer script file locally from our template.
449 self.engine_killer = os.path.join(
455 self.engine_killer = os.path.join(
450 self.temp_dir,
456 self.temp_dir,
451 '%s-local-engine_killer.sh' % os.environ['USER']
457 '%s-local-engine_killer.sh' % os.environ['USER']
452 )
458 )
453 f = open(self.engine_killer, 'w')
459 f = open(self.engine_killer, 'w')
454 f.writelines(self.engine_killer_template)
460 f.writelines(self.engine_killer_template)
455 f.close()
461 f.close()
456
462
457 def start(self, send_furl=False):
463 def start(self, send_furl=False):
458 dlist = []
464 dlist = []
459 for host in self.engine_hosts.keys():
465 for host in self.engine_hosts.keys():
460 count = self.engine_hosts[host]
466 count = self.engine_hosts[host]
461 d = self._start(host, count, send_furl)
467 d = self._start(host, count, send_furl)
462 dlist.append(d)
468 dlist.append(d)
463 return gatherBoth(dlist, consumeErrors=True)
469 return gatherBoth(dlist, consumeErrors=True)
464
470
465 def _start(self, hostname, count=1, send_furl=False):
471 def _start(self, hostname, count=1, send_furl=False):
466 if send_furl:
472 if send_furl:
467 d = self._scp_furl(hostname)
473 d = self._scp_furl(hostname)
468 else:
474 else:
469 d = defer.succeed(None)
475 d = defer.succeed(None)
470 d.addCallback(lambda r: self._scp_sshx(hostname))
476 d.addCallback(lambda r: self._scp_sshx(hostname))
471 d.addCallback(lambda r: self._ssh_engine(hostname, count))
477 d.addCallback(lambda r: self._ssh_engine(hostname, count))
472 return d
478 return d
473
479
474 def _scp_furl(self, hostname):
480 def _scp_furl(self, hostname):
475 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
481 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
476 cmd_list = scp_cmd.split()
482 cmd_list = scp_cmd.split()
477 cmd_list[1] = os.path.expanduser(cmd_list[1])
483 cmd_list[1] = os.path.expanduser(cmd_list[1])
478 log.msg('Copying furl file: %s' % scp_cmd)
484 log.msg('Copying furl file: %s' % scp_cmd)
479 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
485 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
480 return d
486 return d
481
487
482 def _scp_sshx(self, hostname):
488 def _scp_sshx(self, hostname):
483 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
489 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
484 self.sshx, hostname,
490 self.sshx, hostname,
485 self.temp_dir, os.environ['USER']
491 self.temp_dir, os.environ['USER']
486 )
492 )
487 print
493 print
488 log.msg("Copying sshx: %s" % scp_cmd)
494 log.msg("Copying sshx: %s" % scp_cmd)
489 sshx_scp = scp_cmd.split()
495 sshx_scp = scp_cmd.split()
490 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
496 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
491 return d
497 return d
492
498
493 def _ssh_engine(self, hostname, count):
499 def _ssh_engine(self, hostname, count):
494 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
500 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
495 hostname, self.temp_dir,
501 hostname, self.temp_dir,
496 os.environ['USER'], self.engine_command
502 os.environ['USER'], self.engine_command
497 )
503 )
498 cmds = exec_engine.split()
504 cmds = exec_engine.split()
499 dlist = []
505 dlist = []
500 log.msg("about to start engines...")
506 log.msg("about to start engines...")
501 for i in range(count):
507 for i in range(count):
502 log.msg('Starting engines: %s' % exec_engine)
508 log.msg('Starting engines: %s' % exec_engine)
503 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
509 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
504 dlist.append(d)
510 dlist.append(d)
505 return gatherBoth(dlist, consumeErrors=True)
511 return gatherBoth(dlist, consumeErrors=True)
506
512
507 def kill(self):
513 def kill(self):
508 dlist = []
514 dlist = []
509 for host in self.engine_hosts.keys():
515 for host in self.engine_hosts.keys():
510 d = self._killall(host)
516 d = self._killall(host)
511 dlist.append(d)
517 dlist.append(d)
512 return gatherBoth(dlist, consumeErrors=True)
518 return gatherBoth(dlist, consumeErrors=True)
513
519
514 def _killall(self, hostname):
520 def _killall(self, hostname):
515 d = self._scp_engine_killer(hostname)
521 d = self._scp_engine_killer(hostname)
516 d.addCallback(lambda r: self._ssh_kill(hostname))
522 d.addCallback(lambda r: self._ssh_kill(hostname))
517 # d.addErrback(self._exec_err)
523 # d.addErrback(self._exec_err)
518 return d
524 return d
519
525
520 def _scp_engine_killer(self, hostname):
526 def _scp_engine_killer(self, hostname):
521 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
527 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
522 self.engine_killer,
528 self.engine_killer,
523 hostname,
529 hostname,
524 self.temp_dir,
530 self.temp_dir,
525 os.environ['USER']
531 os.environ['USER']
526 )
532 )
527 cmds = scp_cmd.split()
533 cmds = scp_cmd.split()
528 log.msg('Copying engine_killer: %s' % scp_cmd)
534 log.msg('Copying engine_killer: %s' % scp_cmd)
529 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
535 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
530 return d
536 return d
531
537
532 def _ssh_kill(self, hostname):
538 def _ssh_kill(self, hostname):
533 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
539 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
534 hostname,
540 hostname,
535 self.temp_dir,
541 self.temp_dir,
536 os.environ['USER']
542 os.environ['USER']
537 )
543 )
538 log.msg('Killing engine: %s' % kill_cmd)
544 log.msg('Killing engine: %s' % kill_cmd)
539 kill_cmd = kill_cmd.split()
545 kill_cmd = kill_cmd.split()
540 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
546 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
541 return d
547 return d
542
548
543 def _exec_err(self, r):
549 def _exec_err(self, r):
544 log.msg(r)
550 log.msg(r)
545
551
546 #-----------------------------------------------------------------------------
552 #-----------------------------------------------------------------------------
547 # Main functions for the different types of clusters
553 # Main functions for the different types of clusters
548 #-----------------------------------------------------------------------------
554 #-----------------------------------------------------------------------------
549
555
550 # TODO:
556 # TODO:
551 # The logic in these codes should be moved into classes like LocalCluster
557 # The logic in these codes should be moved into classes like LocalCluster
552 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
558 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
553 # The main functions should then just parse the command line arguments, create
559 # The main functions should then just parse the command line arguments, create
554 # the appropriate class and call a 'start' method.
560 # the appropriate class and call a 'start' method.
555
561
556
562
557 def check_security(args, cont_args):
563 def check_security(args, cont_args):
558 """Check to see if we should run with SSL support."""
564 """Check to see if we should run with SSL support."""
559 if (not args.x or not args.y) and not have_crypto:
565 if (not args.x or not args.y) and not have_crypto:
560 log.err("""
566 log.err("""
561 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
567 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
562 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
568 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
563 reactor.stop()
569 reactor.stop()
564 return False
570 return False
565 if args.x:
571 if args.x:
566 cont_args.append('-x')
572 cont_args.append('-x')
567 if args.y:
573 if args.y:
568 cont_args.append('-y')
574 cont_args.append('-y')
569 return True
575 return True
570
576
571
577
572 def check_reuse(args, cont_args):
578 def check_reuse(args, cont_args):
573 """Check to see if we should try to resuse FURL files."""
579 """Check to see if we should try to resuse FURL files."""
574 if args.r:
580 if args.r:
575 cont_args.append('-r')
581 cont_args.append('-r')
576 if args.client_port == 0 or args.engine_port == 0:
582 if args.client_port == 0 or args.engine_port == 0:
577 log.err("""
583 log.err("""
578 To reuse FURL files, you must also set the client and engine ports using
584 To reuse FURL files, you must also set the client and engine ports using
579 the --client-port and --engine-port options.""")
585 the --client-port and --engine-port options.""")
580 reactor.stop()
586 reactor.stop()
581 return False
587 return False
582 cont_args.append('--client-port=%i' % args.client_port)
588 cont_args.append('--client-port=%i' % args.client_port)
583 cont_args.append('--engine-port=%i' % args.engine_port)
589 cont_args.append('--engine-port=%i' % args.engine_port)
584 return True
590 return True
585
591
586
592
587 def _err_and_stop(f):
593 def _err_and_stop(f):
588 """Errback to log a failure and halt the reactor on a fatal error."""
594 """Errback to log a failure and halt the reactor on a fatal error."""
589 log.err(f)
595 log.err(f)
590 reactor.stop()
596 reactor.stop()
591
597
592
598
593 def _delay_start(cont_pid, start_engines, furl_file, reuse):
599 def _delay_start(cont_pid, start_engines, furl_file, reuse):
594 """Wait for controller to create FURL files and the start the engines."""
600 """Wait for controller to create FURL files and the start the engines."""
595 if not reuse:
601 if not reuse:
596 if os.path.isfile(furl_file):
602 if os.path.isfile(furl_file):
597 os.unlink(furl_file)
603 os.unlink(furl_file)
598 log.msg('Waiting for controller to finish starting...')
604 log.msg('Waiting for controller to finish starting...')
599 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
605 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
600 d.addCallback(lambda _: log.msg('Controller started'))
606 d.addCallback(lambda _: log.msg('Controller started'))
601 d.addCallback(lambda _: start_engines(cont_pid))
607 d.addCallback(lambda _: start_engines(cont_pid))
602 return d
608 return d
603
609
604
610
605 def main_local(args):
611 def main_local(args):
606 cont_args = []
612 cont_args = []
607 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
613 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
608
614
609 # Check security settings before proceeding
615 # Check security settings before proceeding
610 if not check_security(args, cont_args):
616 if not check_security(args, cont_args):
611 return
617 return
612
618
613 # See if we are reusing FURL files
619 # See if we are reusing FURL files
614 if not check_reuse(args, cont_args):
620 if not check_reuse(args, cont_args):
615 return
621 return
616
622
617 cl = ControllerLauncher(extra_args=cont_args)
623 cl = ControllerLauncher(extra_args=cont_args)
618 dstart = cl.start()
624 dstart = cl.start()
619 def start_engines(cont_pid):
625 def start_engines(cont_pid):
620 engine_args = []
626 engine_args = []
621 engine_args.append('--logfile=%s' % \
627 engine_args.append('--logfile=%s' % \
622 pjoin(args.logdir,'ipengine%s-' % cont_pid))
628 pjoin(args.logdir,'ipengine%s-' % cont_pid))
623 eset = LocalEngineSet(extra_args=engine_args)
629 eset = LocalEngineSet(extra_args=engine_args)
624 def shutdown(signum, frame):
630 def shutdown(signum, frame):
625 log.msg('Stopping local cluster')
631 log.msg('Stopping local cluster')
626 # We are still playing with the times here, but these seem
632 # We are still playing with the times here, but these seem
627 # to be reliable in allowing everything to exit cleanly.
633 # to be reliable in allowing everything to exit cleanly.
628 eset.interrupt_then_kill(0.5)
634 eset.interrupt_then_kill(0.5)
629 cl.interrupt_then_kill(0.5)
635 cl.interrupt_then_kill(0.5)
630 reactor.callLater(1.0, reactor.stop)
636 reactor.callLater(1.0, reactor.stop)
631 signal.signal(signal.SIGINT,shutdown)
637 signal.signal(signal.SIGINT,shutdown)
632 d = eset.start(args.n)
638 d = eset.start(args.n)
633 return d
639 return d
634 config = kernel_config_manager.get_config_obj()
640 config = kernel_config_manager.get_config_obj()
635 furl_file = config['controller']['engine_furl_file']
641 furl_file = config['controller']['engine_furl_file']
636 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
642 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
637 dstart.addErrback(_err_and_stop)
643 dstart.addErrback(_err_and_stop)
638
644
639
645
640 def main_mpi(args):
646 def main_mpi(args):
641 cont_args = []
647 cont_args = []
642 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
648 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
643
649
644 # Check security settings before proceeding
650 # Check security settings before proceeding
645 if not check_security(args, cont_args):
651 if not check_security(args, cont_args):
646 return
652 return
647
653
648 # See if we are reusing FURL files
654 # See if we are reusing FURL files
649 if not check_reuse(args, cont_args):
655 if not check_reuse(args, cont_args):
650 return
656 return
651
657
652 cl = ControllerLauncher(extra_args=cont_args)
658 cl = ControllerLauncher(extra_args=cont_args)
653 dstart = cl.start()
659 dstart = cl.start()
654 def start_engines(cont_pid):
660 def start_engines(cont_pid):
655 raw_args = [args.cmd]
661 raw_args = [args.cmd]
656 raw_args.extend(['-n',str(args.n)])
662 raw_args.extend(['-n',str(args.n)])
657 raw_args.append('ipengine')
663 raw_args.append('ipengine')
658 raw_args.append('-l')
664 raw_args.append('-l')
659 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
665 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
660 if args.mpi:
666 if args.mpi:
661 raw_args.append('--mpi=%s' % args.mpi)
667 raw_args.append('--mpi=%s' % args.mpi)
662 eset = ProcessLauncher(raw_args)
668 eset = ProcessLauncher(raw_args)
663 def shutdown(signum, frame):
669 def shutdown(signum, frame):
664 log.msg('Stopping local cluster')
670 log.msg('Stopping local cluster')
665 # We are still playing with the times here, but these seem
671 # We are still playing with the times here, but these seem
666 # to be reliable in allowing everything to exit cleanly.
672 # to be reliable in allowing everything to exit cleanly.
667 eset.interrupt_then_kill(1.0)
673 eset.interrupt_then_kill(1.0)
668 cl.interrupt_then_kill(1.0)
674 cl.interrupt_then_kill(1.0)
669 reactor.callLater(2.0, reactor.stop)
675 reactor.callLater(2.0, reactor.stop)
670 signal.signal(signal.SIGINT,shutdown)
676 signal.signal(signal.SIGINT,shutdown)
671 d = eset.start()
677 d = eset.start()
672 return d
678 return d
673 config = kernel_config_manager.get_config_obj()
679 config = kernel_config_manager.get_config_obj()
674 furl_file = config['controller']['engine_furl_file']
680 furl_file = config['controller']['engine_furl_file']
675 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
681 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
676 dstart.addErrback(_err_and_stop)
682 dstart.addErrback(_err_and_stop)
677
683
678
684
679 def main_pbs(args):
685 def main_pbs(args):
680 cont_args = []
686 cont_args = []
681 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
687 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
682
688
683 # Check security settings before proceeding
689 # Check security settings before proceeding
684 if not check_security(args, cont_args):
690 if not check_security(args, cont_args):
685 return
691 return
686
692
687 # See if we are reusing FURL files
693 # See if we are reusing FURL files
688 if not check_reuse(args, cont_args):
694 if not check_reuse(args, cont_args):
689 return
695 return
690
696
691 if args.pbsscript and not os.path.isfile(args.pbsscript):
697 if args.pbsscript and not os.path.isfile(args.pbsscript):
692 log.err('PBS script does not exist: %s' % args.pbsscript)
698 log.err('PBS script does not exist: %s' % args.pbsscript)
693 return
699 return
694
700
695 cl = ControllerLauncher(extra_args=cont_args)
701 cl = ControllerLauncher(extra_args=cont_args)
696 dstart = cl.start()
702 dstart = cl.start()
697 def start_engines(r):
703 def start_engines(r):
698 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
704 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
699 def shutdown(signum, frame):
705 def shutdown(signum, frame):
700 log.msg('Stopping PBS cluster')
706 log.msg('Stopping PBS cluster')
701 d = pbs_set.kill()
707 d = pbs_set.kill()
702 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
708 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
703 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
709 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
704 signal.signal(signal.SIGINT,shutdown)
710 signal.signal(signal.SIGINT,shutdown)
705 d = pbs_set.start(args.n)
711 d = pbs_set.start(args.n)
706 return d
712 return d
707 config = kernel_config_manager.get_config_obj()
713 config = kernel_config_manager.get_config_obj()
708 furl_file = config['controller']['engine_furl_file']
714 furl_file = config['controller']['engine_furl_file']
709 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
715 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
710 dstart.addErrback(_err_and_stop)
716 dstart.addErrback(_err_and_stop)
711
717
712 def main_sge(args):
718 def main_sge(args):
713 cont_args = []
719 cont_args = []
714 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
720 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
715
721
716 # Check security settings before proceeding
722 # Check security settings before proceeding
717 if not check_security(args, cont_args):
723 if not check_security(args, cont_args):
718 return
724 return
719
725
720 # See if we are reusing FURL files
726 # See if we are reusing FURL files
721 if not check_reuse(args, cont_args):
727 if not check_reuse(args, cont_args):
722 return
728 return
723
729
724 if args.sgescript and not os.path.isfile(args.sgescript):
730 if args.sgescript and not os.path.isfile(args.sgescript):
725 log.err('SGE script does not exist: %s' % args.sgescript)
731 log.err('SGE script does not exist: %s' % args.sgescript)
726 return
732 return
727
733
728 cl = ControllerLauncher(extra_args=cont_args)
734 cl = ControllerLauncher(extra_args=cont_args)
729 dstart = cl.start()
735 dstart = cl.start()
730 def start_engines(r):
736 def start_engines(r):
731 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
737 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
732 def shutdown(signum, frame):
738 def shutdown(signum, frame):
733 log.msg('Stopping sge cluster')
739 log.msg('Stopping sge cluster')
734 d = sge_set.kill()
740 d = sge_set.kill()
735 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
741 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
736 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
742 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
737 signal.signal(signal.SIGINT,shutdown)
743 signal.signal(signal.SIGINT,shutdown)
738 d = sge_set.start(args.n)
744 d = sge_set.start(args.n)
739 return d
745 return d
740 config = kernel_config_manager.get_config_obj()
746 config = kernel_config_manager.get_config_obj()
741 furl_file = config['controller']['engine_furl_file']
747 furl_file = config['controller']['engine_furl_file']
742 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
748 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
743 dstart.addErrback(_err_and_stop)
749 dstart.addErrback(_err_and_stop)
744
750
745 def main_lsf(args):
751 def main_lsf(args):
746 cont_args = []
752 cont_args = []
747 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
753 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
748
754
749 # Check security settings before proceeding
755 # Check security settings before proceeding
750 if not check_security(args, cont_args):
756 if not check_security(args, cont_args):
751 return
757 return
752
758
753 # See if we are reusing FURL files
759 # See if we are reusing FURL files
754 if not check_reuse(args, cont_args):
760 if not check_reuse(args, cont_args):
755 return
761 return
756
762
757 if args.lsfscript and not os.path.isfile(args.lsfscript):
763 if args.lsfscript and not os.path.isfile(args.lsfscript):
758 log.err('LSF script does not exist: %s' % args.lsfscript)
764 log.err('LSF script does not exist: %s' % args.lsfscript)
759 return
765 return
760
766
761 cl = ControllerLauncher(extra_args=cont_args)
767 cl = ControllerLauncher(extra_args=cont_args)
762 dstart = cl.start()
768 dstart = cl.start()
763 def start_engines(r):
769 def start_engines(r):
764 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
770 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
765 def shutdown(signum, frame):
771 def shutdown(signum, frame):
766 log.msg('Stopping LSF cluster')
772 log.msg('Stopping LSF cluster')
767 d = lsf_set.kill()
773 d = lsf_set.kill()
768 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
774 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
769 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
775 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
770 signal.signal(signal.SIGINT,shutdown)
776 signal.signal(signal.SIGINT,shutdown)
771 d = lsf_set.start(args.n)
777 d = lsf_set.start(args.n)
772 return d
778 return d
773 config = kernel_config_manager.get_config_obj()
779 config = kernel_config_manager.get_config_obj()
774 furl_file = config['controller']['engine_furl_file']
780 furl_file = config['controller']['engine_furl_file']
775 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
781 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
776 dstart.addErrback(_err_and_stop)
782 dstart.addErrback(_err_and_stop)
777
783
778
784
779 def main_ssh(args):
785 def main_ssh(args):
780 """Start a controller on localhost and engines using ssh.
786 """Start a controller on localhost and engines using ssh.
781
787
782 Your clusterfile should look like::
788 Your clusterfile should look like::
783
789
784 send_furl = False # True, if you want
790 send_furl = False # True, if you want
785 engines = {
791 engines = {
786 'engine_host1' : engine_count,
792 'engine_host1' : engine_count,
787 'engine_host2' : engine_count2
793 'engine_host2' : engine_count2
788 }
794 }
789 """
795 """
790 clusterfile = {}
796 clusterfile = {}
791 execfile(args.clusterfile, clusterfile)
797 execfile(args.clusterfile, clusterfile)
792 if not clusterfile.has_key('send_furl'):
798 if not clusterfile.has_key('send_furl'):
793 clusterfile['send_furl'] = False
799 clusterfile['send_furl'] = False
794
800
795 cont_args = []
801 cont_args = []
796 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
802 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
797
803
798 # Check security settings before proceeding
804 # Check security settings before proceeding
799 if not check_security(args, cont_args):
805 if not check_security(args, cont_args):
800 return
806 return
801
807
802 # See if we are reusing FURL files
808 # See if we are reusing FURL files
803 if not check_reuse(args, cont_args):
809 if not check_reuse(args, cont_args):
804 return
810 return
805
811
806 cl = ControllerLauncher(extra_args=cont_args)
812 cl = ControllerLauncher(extra_args=cont_args)
807 dstart = cl.start()
813 dstart = cl.start()
808 def start_engines(cont_pid):
814 def start_engines(cont_pid):
809 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
815 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx,
816 copyenvs=args.copyenvs)
810 def shutdown(signum, frame):
817 def shutdown(signum, frame):
811 d = ssh_set.kill()
818 d = ssh_set.kill()
812 cl.interrupt_then_kill(1.0)
819 cl.interrupt_then_kill(1.0)
813 reactor.callLater(2.0, reactor.stop)
820 reactor.callLater(2.0, reactor.stop)
814 signal.signal(signal.SIGINT,shutdown)
821 signal.signal(signal.SIGINT,shutdown)
815 d = ssh_set.start(clusterfile['send_furl'])
822 d = ssh_set.start(clusterfile['send_furl'])
816 return d
823 return d
817 config = kernel_config_manager.get_config_obj()
824 config = kernel_config_manager.get_config_obj()
818 furl_file = config['controller']['engine_furl_file']
825 furl_file = config['controller']['engine_furl_file']
819 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
826 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
820 dstart.addErrback(_err_and_stop)
827 dstart.addErrback(_err_and_stop)
821
828
822
829
823 def get_args():
830 def get_args():
824 base_parser = argparse.ArgumentParser(add_help=False)
831 base_parser = argparse.ArgumentParser(add_help=False)
825 base_parser.add_argument(
832 base_parser.add_argument(
826 '-r',
833 '-r',
827 action='store_true',
834 action='store_true',
828 dest='r',
835 dest='r',
829 help='try to reuse FURL files. Use with --client-port and --engine-port'
836 help='try to reuse FURL files. Use with --client-port and --engine-port'
830 )
837 )
831 base_parser.add_argument(
838 base_parser.add_argument(
832 '--client-port',
839 '--client-port',
833 type=int,
840 type=int,
834 dest='client_port',
841 dest='client_port',
835 help='the port the controller will listen on for client connections',
842 help='the port the controller will listen on for client connections',
836 default=0
843 default=0
837 )
844 )
838 base_parser.add_argument(
845 base_parser.add_argument(
839 '--engine-port',
846 '--engine-port',
840 type=int,
847 type=int,
841 dest='engine_port',
848 dest='engine_port',
842 help='the port the controller will listen on for engine connections',
849 help='the port the controller will listen on for engine connections',
843 default=0
850 default=0
844 )
851 )
845 base_parser.add_argument(
852 base_parser.add_argument(
846 '-x',
853 '-x',
847 action='store_true',
854 action='store_true',
848 dest='x',
855 dest='x',
849 help='turn off client security'
856 help='turn off client security'
850 )
857 )
851 base_parser.add_argument(
858 base_parser.add_argument(
852 '-y',
859 '-y',
853 action='store_true',
860 action='store_true',
854 dest='y',
861 dest='y',
855 help='turn off engine security'
862 help='turn off engine security'
856 )
863 )
857 base_parser.add_argument(
864 base_parser.add_argument(
858 "--logdir",
865 "--logdir",
859 type=str,
866 type=str,
860 dest="logdir",
867 dest="logdir",
861 help="directory to put log files (default=$IPYTHONDIR/log)",
868 help="directory to put log files (default=$IPYTHONDIR/log)",
862 default=pjoin(get_ipython_dir(),'log')
869 default=pjoin(get_ipython_dir(),'log')
863 )
870 )
864 base_parser.add_argument(
871 base_parser.add_argument(
865 "-n",
872 "-n",
866 "--num",
873 "--num",
867 type=int,
874 type=int,
868 dest="n",
875 dest="n",
869 default=2,
876 default=2,
870 help="the number of engines to start"
877 help="the number of engines to start"
871 )
878 )
872
879
873 parser = argparse.ArgumentParser(
880 parser = argparse.ArgumentParser(
874 description='IPython cluster startup. This starts a controller and\
881 description='IPython cluster startup. This starts a controller and\
875 engines using various approaches. Use the IPYTHONDIR environment\
882 engines using various approaches. Use the IPYTHONDIR environment\
876 variable to change your IPython directory from the default of\
883 variable to change your IPython directory from the default of\
877 .ipython or _ipython. The log and security subdirectories of your\
884 .ipython or _ipython. The log and security subdirectories of your\
878 IPython directory will be used by this script for log files and\
885 IPython directory will be used by this script for log files and\
879 security files.'
886 security files.'
880 )
887 )
881 subparsers = parser.add_subparsers(
888 subparsers = parser.add_subparsers(
882 help='available cluster types. For help, do "ipcluster TYPE --help"')
889 help='available cluster types. For help, do "ipcluster TYPE --help"')
883
890
884 parser_local = subparsers.add_parser(
891 parser_local = subparsers.add_parser(
885 'local',
892 'local',
886 help='run a local cluster',
893 help='run a local cluster',
887 parents=[base_parser]
894 parents=[base_parser]
888 )
895 )
889 parser_local.set_defaults(func=main_local)
896 parser_local.set_defaults(func=main_local)
890
897
891 parser_mpirun = subparsers.add_parser(
898 parser_mpirun = subparsers.add_parser(
892 'mpirun',
899 'mpirun',
893 help='run a cluster using mpirun (mpiexec also works)',
900 help='run a cluster using mpirun (mpiexec also works)',
894 parents=[base_parser]
901 parents=[base_parser]
895 )
902 )
896 parser_mpirun.add_argument(
903 parser_mpirun.add_argument(
897 "--mpi",
904 "--mpi",
898 type=str,
905 type=str,
899 dest="mpi", # Don't put a default here to allow no MPI support
906 dest="mpi", # Don't put a default here to allow no MPI support
900 help="how to call MPI_Init (default=mpi4py)"
907 help="how to call MPI_Init (default=mpi4py)"
901 )
908 )
902 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
909 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
903
910
904 parser_mpiexec = subparsers.add_parser(
911 parser_mpiexec = subparsers.add_parser(
905 'mpiexec',
912 'mpiexec',
906 help='run a cluster using mpiexec (mpirun also works)',
913 help='run a cluster using mpiexec (mpirun also works)',
907 parents=[base_parser]
914 parents=[base_parser]
908 )
915 )
909 parser_mpiexec.add_argument(
916 parser_mpiexec.add_argument(
910 "--mpi",
917 "--mpi",
911 type=str,
918 type=str,
912 dest="mpi", # Don't put a default here to allow no MPI support
919 dest="mpi", # Don't put a default here to allow no MPI support
913 help="how to call MPI_Init (default=mpi4py)"
920 help="how to call MPI_Init (default=mpi4py)"
914 )
921 )
915 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
922 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
916
923
917 parser_pbs = subparsers.add_parser(
924 parser_pbs = subparsers.add_parser(
918 'pbs',
925 'pbs',
919 help='run a pbs cluster',
926 help='run a pbs cluster',
920 parents=[base_parser]
927 parents=[base_parser]
921 )
928 )
922 parser_pbs.add_argument(
929 parser_pbs.add_argument(
923 '-s',
930 '-s',
924 '--pbs-script',
931 '--pbs-script',
925 type=str,
932 type=str,
926 dest='pbsscript',
933 dest='pbsscript',
927 help='PBS script template',
934 help='PBS script template',
928 default=''
935 default=''
929 )
936 )
930 parser_pbs.add_argument(
937 parser_pbs.add_argument(
931 '-q',
938 '-q',
932 '--queue',
939 '--queue',
933 type=str,
940 type=str,
934 dest='pbsqueue',
941 dest='pbsqueue',
935 help='PBS queue to use when starting the engines',
942 help='PBS queue to use when starting the engines',
936 default=None,
943 default=None,
937 )
944 )
938 parser_pbs.set_defaults(func=main_pbs)
945 parser_pbs.set_defaults(func=main_pbs)
939
946
940 parser_sge = subparsers.add_parser(
947 parser_sge = subparsers.add_parser(
941 'sge',
948 'sge',
942 help='run an sge cluster',
949 help='run an sge cluster',
943 parents=[base_parser]
950 parents=[base_parser]
944 )
951 )
945 parser_sge.add_argument(
952 parser_sge.add_argument(
946 '-s',
953 '-s',
947 '--sge-script',
954 '--sge-script',
948 type=str,
955 type=str,
949 dest='sgescript',
956 dest='sgescript',
950 help='SGE script template',
957 help='SGE script template',
951 default='' # SGEEngineSet will create one if not specified
958 default='' # SGEEngineSet will create one if not specified
952 )
959 )
953 parser_sge.add_argument(
960 parser_sge.add_argument(
954 '-q',
961 '-q',
955 '--queue',
962 '--queue',
956 type=str,
963 type=str,
957 dest='sgequeue',
964 dest='sgequeue',
958 help='SGE queue to use when starting the engines',
965 help='SGE queue to use when starting the engines',
959 default=None,
966 default=None,
960 )
967 )
961 parser_sge.set_defaults(func=main_sge)
968 parser_sge.set_defaults(func=main_sge)
962
969
963 parser_lsf = subparsers.add_parser(
970 parser_lsf = subparsers.add_parser(
964 'lsf',
971 'lsf',
965 help='run an lsf cluster',
972 help='run an lsf cluster',
966 parents=[base_parser]
973 parents=[base_parser]
967 )
974 )
968
975
969 parser_lsf.add_argument(
976 parser_lsf.add_argument(
970 '-s',
977 '-s',
971 '--lsf-script',
978 '--lsf-script',
972 type=str,
979 type=str,
973 dest='lsfscript',
980 dest='lsfscript',
974 help='LSF script template',
981 help='LSF script template',
975 default='' # LSFEngineSet will create one if not specified
982 default='' # LSFEngineSet will create one if not specified
976 )
983 )
977
984
978 parser_lsf.add_argument(
985 parser_lsf.add_argument(
979 '-q',
986 '-q',
980 '--queue',
987 '--queue',
981 type=str,
988 type=str,
982 dest='lsfqueue',
989 dest='lsfqueue',
983 help='LSF queue to use when starting the engines',
990 help='LSF queue to use when starting the engines',
984 default=None,
991 default=None,
985 )
992 )
986 parser_lsf.set_defaults(func=main_lsf)
993 parser_lsf.set_defaults(func=main_lsf)
987
994
988 parser_ssh = subparsers.add_parser(
995 parser_ssh = subparsers.add_parser(
989 'ssh',
996 'ssh',
990 help='run a cluster using ssh, should have ssh-keys setup',
997 help='run a cluster using ssh, should have ssh-keys setup',
991 parents=[base_parser]
998 parents=[base_parser]
992 )
999 )
993 parser_ssh.add_argument(
1000 parser_ssh.add_argument(
1001 '-e',
1002 '--copyenvs',
1003 action='store_true',
1004 dest='copyenvs',
1005 help='Copy current shell environment to remote location',
1006 default=False,
1007 )
1008 parser_ssh.add_argument(
994 '--clusterfile',
1009 '--clusterfile',
995 type=str,
1010 type=str,
996 dest='clusterfile',
1011 dest='clusterfile',
997 help='python file describing the cluster',
1012 help='python file describing the cluster',
998 default='clusterfile.py',
1013 default='clusterfile.py',
999 )
1014 )
1000 parser_ssh.add_argument(
1015 parser_ssh.add_argument(
1001 '--sshx',
1016 '--sshx',
1002 type=str,
1017 type=str,
1003 dest='sshx',
1018 dest='sshx',
1004 help='sshx launcher helper'
1019 help='sshx launcher helper'
1005 )
1020 )
1006 parser_ssh.set_defaults(func=main_ssh)
1021 parser_ssh.set_defaults(func=main_ssh)
1007
1022
1008 args = parser.parse_args()
1023 args = parser.parse_args()
1009 return args
1024 return args
1010
1025
1011 def main():
1026 def main():
1012 args = get_args()
1027 args = get_args()
1013 reactor.callWhenRunning(args.func, args)
1028 reactor.callWhenRunning(args.func, args)
1014 log.startLogging(sys.stdout)
1029 log.startLogging(sys.stdout)
1015 reactor.run()
1030 reactor.run()
1016
1031
1017 if __name__ == '__main__':
1032 if __name__ == '__main__':
1018 main()
1033 main()
General Comments 0
You need to be logged in to leave comments. Login now