##// END OF EJS Templates
Fixes for LSF
Matthieu Brucher -
Show More
@@ -1,998 +1,1001 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import stat
21 import tempfile
22 import tempfile
22 pjoin = os.path.join
23 pjoin = os.path.join
23
24
24 from twisted.internet import reactor, defer
25 from twisted.internet import reactor, defer
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
28 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
29 from twisted.python import failure, log
29
30
30 from IPython.external import argparse
31 from IPython.external import argparse
31 from IPython.external import Itpl
32 from IPython.external import Itpl
32 from IPython.genutils import (
33 from IPython.genutils import (
33 get_ipython_dir,
34 get_ipython_dir,
34 get_log_dir,
35 get_log_dir,
35 get_security_dir,
36 get_security_dir,
36 num_cpus
37 num_cpus
37 )
38 )
38 from IPython.kernel.fcutil import have_crypto
39 from IPython.kernel.fcutil import have_crypto
39
40
40 # Create various ipython directories if they don't exist.
41 # Create various ipython directories if they don't exist.
41 # This must be done before IPython.kernel.config is imported.
42 # This must be done before IPython.kernel.config is imported.
42 from IPython.iplib import user_setup
43 from IPython.iplib import user_setup
43 if os.name == 'posix':
44 if os.name == 'posix':
44 rc_suffix = ''
45 rc_suffix = ''
45 else:
46 else:
46 rc_suffix = '.ini'
47 rc_suffix = '.ini'
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 get_log_dir()
49 get_log_dir()
49 get_security_dir()
50 get_security_dir()
50
51
51 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.util import printer
56 from IPython.kernel.util import printer
56
57
57 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
58 # General process handling code
59 # General process handling code
59 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
60
61
61
62
62 class ProcessStateError(Exception):
63 class ProcessStateError(Exception):
63 pass
64 pass
64
65
65 class UnknownStatus(Exception):
66 class UnknownStatus(Exception):
66 pass
67 pass
67
68
68 class LauncherProcessProtocol(ProcessProtocol):
69 class LauncherProcessProtocol(ProcessProtocol):
69 """
70 """
70 A ProcessProtocol to go with the ProcessLauncher.
71 A ProcessProtocol to go with the ProcessLauncher.
71 """
72 """
72 def __init__(self, process_launcher):
73 def __init__(self, process_launcher):
73 self.process_launcher = process_launcher
74 self.process_launcher = process_launcher
74
75
75 def connectionMade(self):
76 def connectionMade(self):
76 self.process_launcher.fire_start_deferred(self.transport.pid)
77 self.process_launcher.fire_start_deferred(self.transport.pid)
77
78
78 def processEnded(self, status):
79 def processEnded(self, status):
79 value = status.value
80 value = status.value
80 if isinstance(value, ProcessDone):
81 if isinstance(value, ProcessDone):
81 self.process_launcher.fire_stop_deferred(0)
82 self.process_launcher.fire_stop_deferred(0)
82 elif isinstance(value, ProcessTerminated):
83 elif isinstance(value, ProcessTerminated):
83 self.process_launcher.fire_stop_deferred(
84 self.process_launcher.fire_stop_deferred(
84 {'exit_code':value.exitCode,
85 {'exit_code':value.exitCode,
85 'signal':value.signal,
86 'signal':value.signal,
86 'status':value.status
87 'status':value.status
87 }
88 }
88 )
89 )
89 else:
90 else:
90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91
92
92 def outReceived(self, data):
93 def outReceived(self, data):
93 log.msg(data)
94 log.msg(data)
94
95
95 def errReceived(self, data):
96 def errReceived(self, data):
96 log.err(data)
97 log.err(data)
97
98
98 class ProcessLauncher(object):
99 class ProcessLauncher(object):
99 """
100 """
100 Start and stop an external process in an asynchronous manner.
101 Start and stop an external process in an asynchronous manner.
101
102
102 Currently this uses deferreds to notify other parties of process state
103 Currently this uses deferreds to notify other parties of process state
103 changes. This is an awkward design and should be moved to using
104 changes. This is an awkward design and should be moved to using
104 a formal NotificationCenter.
105 a formal NotificationCenter.
105 """
106 """
106 def __init__(self, cmd_and_args):
107 def __init__(self, cmd_and_args):
107 self.cmd = cmd_and_args[0]
108 self.cmd = cmd_and_args[0]
108 self.args = cmd_and_args
109 self.args = cmd_and_args
109 self._reset()
110 self._reset()
110
111
111 def _reset(self):
112 def _reset(self):
112 self.process_protocol = None
113 self.process_protocol = None
113 self.pid = None
114 self.pid = None
114 self.start_deferred = None
115 self.start_deferred = None
115 self.stop_deferreds = []
116 self.stop_deferreds = []
116 self.state = 'before' # before, running, or after
117 self.state = 'before' # before, running, or after
117
118
118 @property
119 @property
119 def running(self):
120 def running(self):
120 if self.state == 'running':
121 if self.state == 'running':
121 return True
122 return True
122 else:
123 else:
123 return False
124 return False
124
125
125 def fire_start_deferred(self, pid):
126 def fire_start_deferred(self, pid):
126 self.pid = pid
127 self.pid = pid
127 self.state = 'running'
128 self.state = 'running'
128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 self.start_deferred.callback(pid)
130 self.start_deferred.callback(pid)
130
131
131 def start(self):
132 def start(self):
132 if self.state == 'before':
133 if self.state == 'before':
133 self.process_protocol = LauncherProcessProtocol(self)
134 self.process_protocol = LauncherProcessProtocol(self)
134 self.start_deferred = defer.Deferred()
135 self.start_deferred = defer.Deferred()
135 self.process_transport = reactor.spawnProcess(
136 self.process_transport = reactor.spawnProcess(
136 self.process_protocol,
137 self.process_protocol,
137 self.cmd,
138 self.cmd,
138 self.args,
139 self.args,
139 env=os.environ
140 env=os.environ
140 )
141 )
141 return self.start_deferred
142 return self.start_deferred
142 else:
143 else:
143 s = 'the process has already been started and has state: %r' % \
144 s = 'the process has already been started and has state: %r' % \
144 self.state
145 self.state
145 return defer.fail(ProcessStateError(s))
146 return defer.fail(ProcessStateError(s))
146
147
147 def get_stop_deferred(self):
148 def get_stop_deferred(self):
148 if self.state == 'running' or self.state == 'before':
149 if self.state == 'running' or self.state == 'before':
149 d = defer.Deferred()
150 d = defer.Deferred()
150 self.stop_deferreds.append(d)
151 self.stop_deferreds.append(d)
151 return d
152 return d
152 else:
153 else:
153 s = 'this process is already complete'
154 s = 'this process is already complete'
154 return defer.fail(ProcessStateError(s))
155 return defer.fail(ProcessStateError(s))
155
156
156 def fire_stop_deferred(self, exit_code):
157 def fire_stop_deferred(self, exit_code):
157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 self.state = 'after'
159 self.state = 'after'
159 for d in self.stop_deferreds:
160 for d in self.stop_deferreds:
160 d.callback(exit_code)
161 d.callback(exit_code)
161
162
162 def signal(self, sig):
163 def signal(self, sig):
163 """
164 """
164 Send a signal to the process.
165 Send a signal to the process.
165
166
166 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 """
168 """
168 if self.state == 'running':
169 if self.state == 'running':
169 self.process_transport.signalProcess(sig)
170 self.process_transport.signalProcess(sig)
170
171
171 # def __del__(self):
172 # def __del__(self):
172 # self.signal('KILL')
173 # self.signal('KILL')
173
174
174 def interrupt_then_kill(self, delay=1.0):
175 def interrupt_then_kill(self, delay=1.0):
175 self.signal('INT')
176 self.signal('INT')
176 reactor.callLater(delay, self.signal, 'KILL')
177 reactor.callLater(delay, self.signal, 'KILL')
177
178
178
179
179 #-----------------------------------------------------------------------------
180 #-----------------------------------------------------------------------------
180 # Code for launching controller and engines
181 # Code for launching controller and engines
181 #-----------------------------------------------------------------------------
182 #-----------------------------------------------------------------------------
182
183
183
184
184 class ControllerLauncher(ProcessLauncher):
185 class ControllerLauncher(ProcessLauncher):
185
186
186 def __init__(self, extra_args=None):
187 def __init__(self, extra_args=None):
187 if sys.platform == 'win32':
188 if sys.platform == 'win32':
188 # This logic is needed because the ipcontroller script doesn't
189 # This logic is needed because the ipcontroller script doesn't
189 # always get installed in the same way or in the same location.
190 # always get installed in the same way or in the same location.
190 from IPython.kernel.scripts import ipcontroller
191 from IPython.kernel.scripts import ipcontroller
191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 # The -u option here turns on unbuffered output, which is required
193 # The -u option here turns on unbuffered output, which is required
193 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # Also, use sys.executable to make sure we are picking up the
195 # Also, use sys.executable to make sure we are picking up the
195 # right python exe.
196 # right python exe.
196 args = [sys.executable, '-u', script_location]
197 args = [sys.executable, '-u', script_location]
197 else:
198 else:
198 args = ['ipcontroller']
199 args = ['ipcontroller']
199 self.extra_args = extra_args
200 self.extra_args = extra_args
200 if extra_args is not None:
201 if extra_args is not None:
201 args.extend(extra_args)
202 args.extend(extra_args)
202
203
203 ProcessLauncher.__init__(self, args)
204 ProcessLauncher.__init__(self, args)
204
205
205
206
206 class EngineLauncher(ProcessLauncher):
207 class EngineLauncher(ProcessLauncher):
207
208
208 def __init__(self, extra_args=None):
209 def __init__(self, extra_args=None):
209 if sys.platform == 'win32':
210 if sys.platform == 'win32':
210 # This logic is needed because the ipcontroller script doesn't
211 # This logic is needed because the ipcontroller script doesn't
211 # always get installed in the same way or in the same location.
212 # always get installed in the same way or in the same location.
212 from IPython.kernel.scripts import ipengine
213 from IPython.kernel.scripts import ipengine
213 script_location = ipengine.__file__.replace('.pyc', '.py')
214 script_location = ipengine.__file__.replace('.pyc', '.py')
214 # The -u option here turns on unbuffered output, which is required
215 # The -u option here turns on unbuffered output, which is required
215 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # Also, use sys.executable to make sure we are picking up the
217 # Also, use sys.executable to make sure we are picking up the
217 # right python exe.
218 # right python exe.
218 args = [sys.executable, '-u', script_location]
219 args = [sys.executable, '-u', script_location]
219 else:
220 else:
220 args = ['ipengine']
221 args = ['ipengine']
221 self.extra_args = extra_args
222 self.extra_args = extra_args
222 if extra_args is not None:
223 if extra_args is not None:
223 args.extend(extra_args)
224 args.extend(extra_args)
224
225
225 ProcessLauncher.__init__(self, args)
226 ProcessLauncher.__init__(self, args)
226
227
227
228
228 class LocalEngineSet(object):
229 class LocalEngineSet(object):
229
230
230 def __init__(self, extra_args=None):
231 def __init__(self, extra_args=None):
231 self.extra_args = extra_args
232 self.extra_args = extra_args
232 self.launchers = []
233 self.launchers = []
233
234
234 def start(self, n):
235 def start(self, n):
235 dlist = []
236 dlist = []
236 for i in range(n):
237 for i in range(n):
237 print "starting engine:", i
238 print "starting engine:", i
238 el = EngineLauncher(extra_args=self.extra_args)
239 el = EngineLauncher(extra_args=self.extra_args)
239 d = el.start()
240 d = el.start()
240 self.launchers.append(el)
241 self.launchers.append(el)
241 dlist.append(d)
242 dlist.append(d)
242 dfinal = gatherBoth(dlist, consumeErrors=True)
243 dfinal = gatherBoth(dlist, consumeErrors=True)
243 dfinal.addCallback(self._handle_start)
244 dfinal.addCallback(self._handle_start)
244 return dfinal
245 return dfinal
245
246
246 def _handle_start(self, r):
247 def _handle_start(self, r):
247 log.msg('Engines started with pids: %r' % r)
248 log.msg('Engines started with pids: %r' % r)
248 return r
249 return r
249
250
250 def _handle_stop(self, r):
251 def _handle_stop(self, r):
251 log.msg('Engines received signal: %r' % r)
252 log.msg('Engines received signal: %r' % r)
252 return r
253 return r
253
254
254 def signal(self, sig):
255 def signal(self, sig):
255 dlist = []
256 dlist = []
256 for el in self.launchers:
257 for el in self.launchers:
257 d = el.get_stop_deferred()
258 d = el.get_stop_deferred()
258 dlist.append(d)
259 dlist.append(d)
259 el.signal(sig)
260 el.signal(sig)
260 dfinal = gatherBoth(dlist, consumeErrors=True)
261 dfinal = gatherBoth(dlist, consumeErrors=True)
261 dfinal.addCallback(self._handle_stop)
262 dfinal.addCallback(self._handle_stop)
262 return dfinal
263 return dfinal
263
264
264 def interrupt_then_kill(self, delay=1.0):
265 def interrupt_then_kill(self, delay=1.0):
265 dlist = []
266 dlist = []
266 for el in self.launchers:
267 for el in self.launchers:
267 d = el.get_stop_deferred()
268 d = el.get_stop_deferred()
268 dlist.append(d)
269 dlist.append(d)
269 el.interrupt_then_kill(delay)
270 el.interrupt_then_kill(delay)
270 dfinal = gatherBoth(dlist, consumeErrors=True)
271 dfinal = gatherBoth(dlist, consumeErrors=True)
271 dfinal.addCallback(self._handle_stop)
272 dfinal.addCallback(self._handle_stop)
272 return dfinal
273 return dfinal
273
274
274 class BatchEngineSet(object):
275 class BatchEngineSet(object):
275
276
276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 name = ''
278 name = ''
278 submit_command = ''
279 submit_command = ''
279 delete_command = ''
280 delete_command = ''
280 job_id_regexp = ''
281 job_id_regexp = ''
281 job_array_regexp = ''
282 job_array_regexp = ''
282 job_array_template = ''
283 job_array_template = ''
283 queue_regexp = ''
284 queue_regexp = ''
284 queue_template = ''
285 queue_template = ''
285 default_template = ''
286 default_template = ''
286
287
287 def __init__(self, template_file, queue, **kwargs):
288 def __init__(self, template_file, queue, **kwargs):
288 self.template_file = template_file
289 self.template_file = template_file
289 self.queue = queue
290 self.queue = queue
290
291
291 def parse_job_id(self, output):
292 def parse_job_id(self, output):
292 m = re.search(self.job_id_regexp, output)
293 m = re.search(self.job_id_regexp, output)
293 if m is not None:
294 if m is not None:
294 job_id = m.group()
295 job_id = m.group()
295 else:
296 else:
296 raise Exception("job id couldn't be determined: %s" % output)
297 raise Exception("job id couldn't be determined: %s" % output)
297 self.job_id = job_id
298 self.job_id = job_id
298 log.msg('Job started with job id: %r' % job_id)
299 log.msg('Job started with job id: %r' % job_id)
299 return job_id
300 return job_id
300
301
301 def handle_error(self, f):
302 def handle_error(self, f):
302 f.printTraceback()
303 f.printTraceback()
303 f.raiseException()
304 f.raiseException()
304
305
305 def start(self, n):
306 def start(self, n):
306 log.msg("starting %d engines" % n)
307 log.msg("starting %d engines" % n)
307 self._temp_file = tempfile.NamedTemporaryFile()
308 self._temp_file = tempfile.NamedTemporaryFile()
308 if self.template_file:
309 if self.template_file:
309 log.msg("Using %s script %s" % (self.name, self.template_file))
310 log.msg("Using %s script %s" % (self.name, self.template_file))
310 contents = open(self.template_file, 'r').read()
311 contents = open(self.template_file, 'r').read()
311 new_script = contents
312 new_script = contents
312 regex = re.compile(self.job_array_regexp)
313 regex = re.compile(self.job_array_regexp)
313 if not regex.search(contents):
314 if not regex.search(contents):
314 log.msg("adding job array settings to %s script" % self.name)
315 log.msg("adding job array settings to %s script" % self.name)
315 new_script = self.job_array_template % n +'\n' + new_script
316 new_script = self.job_array_template % n +'\n' + new_script
316 print self.queue_regexp
317 print self.queue_regexp
317 regex = re.compile(self.queue_regexp)
318 regex = re.compile(self.queue_regexp)
318 print regex.search(contents)
319 print regex.search(contents)
319 if self.queue and not regex.search(contents):
320 if self.queue and not regex.search(contents):
320 log.msg("adding queue settings to %s script" % self.name)
321 log.msg("adding queue settings to %s script" % self.name)
321 new_script = self.queue_template % self.queue + '\n' + new_script
322 new_script = self.queue_template % self.queue + '\n' + new_script
322 if new_script != contents:
323 if new_script != contents:
323 self._temp_file.write(new_script)
324 self._temp_file.write(new_script)
324 self.template_file = self._temp_file.name
325 self.template_file = self._temp_file.name
325 else:
326 else:
326 default_script = self.default_template % n
327 default_script = self.default_template % n
327 if self.queue:
328 if self.queue:
328 default_script = self.queue_template % self.queue + \
329 default_script = self.queue_template % self.queue + \
329 '\n' + default_script
330 '\n' + default_script
330 log.msg("using default ipengine %s script: \n%s" %
331 log.msg("using default ipengine %s script: \n%s" %
331 (self.name, default_script))
332 (self.name, default_script))
332 self._temp_file.file.write(default_script)
333 self._temp_file.file.write(default_script)
333 self.template_file = self._temp_file.name
334 self.template_file = self._temp_file.name
334 self._temp_file.file.flush()
335 self._temp_file.file.flush()
336 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
337 self._temp_file.file.close()
335 d = getProcessOutput(self.submit_command,
338 d = getProcessOutput(self.submit_command,
336 [self.template_file],
339 [self.template_file],
337 env=os.environ)
340 env=os.environ)
338 d.addCallback(self.parse_job_id)
341 d.addCallback(self.parse_job_id)
339 d.addErrback(self.handle_error)
342 d.addErrback(self.handle_error)
340 return d
343 return d
341
344
342 def kill(self):
345 def kill(self):
343 d = getProcessOutput(self.delete_command,
346 d = getProcessOutput(self.delete_command,
344 [self.job_id],env=os.environ)
347 [self.job_id],env=os.environ)
345 return d
348 return d
346
349
347 class PBSEngineSet(BatchEngineSet):
350 class PBSEngineSet(BatchEngineSet):
348
351
349 name = 'PBS'
352 name = 'PBS'
350 submit_command = 'qsub'
353 submit_command = 'qsub'
351 delete_command = 'qdel'
354 delete_command = 'qdel'
352 job_id_regexp = '\d+'
355 job_id_regexp = '\d+'
353 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
356 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
354 job_array_template = '#PBS -t 1-%d'
357 job_array_template = '#PBS -t 1-%d'
355 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
358 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
356 queue_template = '#PBS -q %s'
359 queue_template = '#PBS -q %s'
357 default_template="""#PBS -V
360 default_template="""#PBS -V
358 #PBS -t 1-%d
361 #PBS -t 1-%d
359 #PBS -N ipengine
362 #PBS -N ipengine
360 eid=$(($PBS_ARRAYID - 1))
363 eid=$(($PBS_ARRAYID - 1))
361 ipengine --logfile=ipengine${eid}.log
364 ipengine --logfile=ipengine${eid}.log
362 """
365 """
363
366
364 class SGEEngineSet(PBSEngineSet):
367 class SGEEngineSet(PBSEngineSet):
365
368
366 name = 'SGE'
369 name = 'SGE'
367 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
368 job_array_template = '#$ -t 1-%d'
371 job_array_template = '#$ -t 1-%d'
369 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
370 queue_template = '#$ -q %s'
373 queue_template = '#$ -q %s'
371 default_template="""#$ -V
374 default_template="""#$ -V
372 #$ -t 1-%d
375 #$ -t 1-%d
373 #$ -N ipengine
376 #$ -N ipengine
374 eid=$(($SGE_TASK_ID - 1))
377 eid=$(($SGE_TASK_ID - 1))
375 ipengine --logfile=ipengine${eid}.log
378 ipengine --logfile=ipengine${eid}.log
376 """
379 """
377
380
378 class LSFEngineSet(PBSEngineSet):
381 class LSFEngineSet(PBSEngineSet):
379
382
380 name = 'LSF'
383 name = 'LSF'
381 submit_command = 'bsub'
384 submit_command = 'bsub'
382 delete_command = 'bkill'
385 delete_command = 'bkill'
383 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
386 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
384 job_array_template = '#BSUB -J ipengine[1-%d]'
387 job_array_template = '#BSUB -J ipengine[1-%d]'
385 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
388 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
386 queue_template = '#BSUB -q %s'
389 queue_template = '#BSUB -q %s'
387 default_template="""#BSUB -J ipengine[1-%d]
390 default_template="""#BSUB -J ipengine[1-%d]
388 eid=$(($LSB_JOBINDEX - 1))
391 eid=$(($LSB_JOBINDEX - 1))
389 ipengine --logfile=ipengine${eid}.log
392 ipengine --logfile=ipengine${eid}.log
390 """
393 """
391
394
392 sshx_template="""#!/bin/sh
395 sshx_template="""#!/bin/sh
393 "$@" &> /dev/null &
396 "$@" &> /dev/null &
394 echo $!
397 echo $!
395 """
398 """
396
399
397 engine_killer_template="""#!/bin/sh
400 engine_killer_template="""#!/bin/sh
398 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
401 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
399 """
402 """
400
403
401 class SSHEngineSet(object):
404 class SSHEngineSet(object):
402 sshx_template=sshx_template
405 sshx_template=sshx_template
403 engine_killer_template=engine_killer_template
406 engine_killer_template=engine_killer_template
404
407
405 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
408 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
406 """Start a controller on localhost and engines using ssh.
409 """Start a controller on localhost and engines using ssh.
407
410
408 The engine_hosts argument is a dict with hostnames as keys and
411 The engine_hosts argument is a dict with hostnames as keys and
409 the number of engine (int) as values. sshx is the name of a local
412 the number of engine (int) as values. sshx is the name of a local
410 file that will be used to run remote commands. This file is used
413 file that will be used to run remote commands. This file is used
411 to setup the environment properly.
414 to setup the environment properly.
412 """
415 """
413
416
414 self.temp_dir = tempfile.gettempdir()
417 self.temp_dir = tempfile.gettempdir()
415 if sshx is not None:
418 if sshx is not None:
416 self.sshx = sshx
419 self.sshx = sshx
417 else:
420 else:
418 # Write the sshx.sh file locally from our template.
421 # Write the sshx.sh file locally from our template.
419 self.sshx = os.path.join(
422 self.sshx = os.path.join(
420 self.temp_dir,
423 self.temp_dir,
421 '%s-main-sshx.sh' % os.environ['USER']
424 '%s-main-sshx.sh' % os.environ['USER']
422 )
425 )
423 f = open(self.sshx, 'w')
426 f = open(self.sshx, 'w')
424 f.writelines(self.sshx_template)
427 f.writelines(self.sshx_template)
425 f.close()
428 f.close()
426 self.engine_command = ipengine
429 self.engine_command = ipengine
427 self.engine_hosts = engine_hosts
430 self.engine_hosts = engine_hosts
428 # Write the engine killer script file locally from our template.
431 # Write the engine killer script file locally from our template.
429 self.engine_killer = os.path.join(
432 self.engine_killer = os.path.join(
430 self.temp_dir,
433 self.temp_dir,
431 '%s-local-engine_killer.sh' % os.environ['USER']
434 '%s-local-engine_killer.sh' % os.environ['USER']
432 )
435 )
433 f = open(self.engine_killer, 'w')
436 f = open(self.engine_killer, 'w')
434 f.writelines(self.engine_killer_template)
437 f.writelines(self.engine_killer_template)
435 f.close()
438 f.close()
436
439
437 def start(self, send_furl=False):
440 def start(self, send_furl=False):
438 dlist = []
441 dlist = []
439 for host in self.engine_hosts.keys():
442 for host in self.engine_hosts.keys():
440 count = self.engine_hosts[host]
443 count = self.engine_hosts[host]
441 d = self._start(host, count, send_furl)
444 d = self._start(host, count, send_furl)
442 dlist.append(d)
445 dlist.append(d)
443 return gatherBoth(dlist, consumeErrors=True)
446 return gatherBoth(dlist, consumeErrors=True)
444
447
445 def _start(self, hostname, count=1, send_furl=False):
448 def _start(self, hostname, count=1, send_furl=False):
446 if send_furl:
449 if send_furl:
447 d = self._scp_furl(hostname)
450 d = self._scp_furl(hostname)
448 else:
451 else:
449 d = defer.succeed(None)
452 d = defer.succeed(None)
450 d.addCallback(lambda r: self._scp_sshx(hostname))
453 d.addCallback(lambda r: self._scp_sshx(hostname))
451 d.addCallback(lambda r: self._ssh_engine(hostname, count))
454 d.addCallback(lambda r: self._ssh_engine(hostname, count))
452 return d
455 return d
453
456
454 def _scp_furl(self, hostname):
457 def _scp_furl(self, hostname):
455 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
458 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
456 cmd_list = scp_cmd.split()
459 cmd_list = scp_cmd.split()
457 cmd_list[1] = os.path.expanduser(cmd_list[1])
460 cmd_list[1] = os.path.expanduser(cmd_list[1])
458 log.msg('Copying furl file: %s' % scp_cmd)
461 log.msg('Copying furl file: %s' % scp_cmd)
459 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
462 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
460 return d
463 return d
461
464
462 def _scp_sshx(self, hostname):
465 def _scp_sshx(self, hostname):
463 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
466 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
464 self.sshx, hostname,
467 self.sshx, hostname,
465 self.temp_dir, os.environ['USER']
468 self.temp_dir, os.environ['USER']
466 )
469 )
467 print
470 print
468 log.msg("Copying sshx: %s" % scp_cmd)
471 log.msg("Copying sshx: %s" % scp_cmd)
469 sshx_scp = scp_cmd.split()
472 sshx_scp = scp_cmd.split()
470 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
473 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
471 return d
474 return d
472
475
473 def _ssh_engine(self, hostname, count):
476 def _ssh_engine(self, hostname, count):
474 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
477 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
475 hostname, self.temp_dir,
478 hostname, self.temp_dir,
476 os.environ['USER'], self.engine_command
479 os.environ['USER'], self.engine_command
477 )
480 )
478 cmds = exec_engine.split()
481 cmds = exec_engine.split()
479 dlist = []
482 dlist = []
480 log.msg("about to start engines...")
483 log.msg("about to start engines...")
481 for i in range(count):
484 for i in range(count):
482 log.msg('Starting engines: %s' % exec_engine)
485 log.msg('Starting engines: %s' % exec_engine)
483 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
486 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
484 dlist.append(d)
487 dlist.append(d)
485 return gatherBoth(dlist, consumeErrors=True)
488 return gatherBoth(dlist, consumeErrors=True)
486
489
487 def kill(self):
490 def kill(self):
488 dlist = []
491 dlist = []
489 for host in self.engine_hosts.keys():
492 for host in self.engine_hosts.keys():
490 d = self._killall(host)
493 d = self._killall(host)
491 dlist.append(d)
494 dlist.append(d)
492 return gatherBoth(dlist, consumeErrors=True)
495 return gatherBoth(dlist, consumeErrors=True)
493
496
494 def _killall(self, hostname):
497 def _killall(self, hostname):
495 d = self._scp_engine_killer(hostname)
498 d = self._scp_engine_killer(hostname)
496 d.addCallback(lambda r: self._ssh_kill(hostname))
499 d.addCallback(lambda r: self._ssh_kill(hostname))
497 # d.addErrback(self._exec_err)
500 # d.addErrback(self._exec_err)
498 return d
501 return d
499
502
500 def _scp_engine_killer(self, hostname):
503 def _scp_engine_killer(self, hostname):
501 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
504 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
502 self.engine_killer,
505 self.engine_killer,
503 hostname,
506 hostname,
504 self.temp_dir,
507 self.temp_dir,
505 os.environ['USER']
508 os.environ['USER']
506 )
509 )
507 cmds = scp_cmd.split()
510 cmds = scp_cmd.split()
508 log.msg('Copying engine_killer: %s' % scp_cmd)
511 log.msg('Copying engine_killer: %s' % scp_cmd)
509 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
512 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
510 return d
513 return d
511
514
512 def _ssh_kill(self, hostname):
515 def _ssh_kill(self, hostname):
513 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
516 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
514 hostname,
517 hostname,
515 self.temp_dir,
518 self.temp_dir,
516 os.environ['USER']
519 os.environ['USER']
517 )
520 )
518 log.msg('Killing engine: %s' % kill_cmd)
521 log.msg('Killing engine: %s' % kill_cmd)
519 kill_cmd = kill_cmd.split()
522 kill_cmd = kill_cmd.split()
520 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
523 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
521 return d
524 return d
522
525
523 def _exec_err(self, r):
526 def _exec_err(self, r):
524 log.msg(r)
527 log.msg(r)
525
528
526 #-----------------------------------------------------------------------------
529 #-----------------------------------------------------------------------------
527 # Main functions for the different types of clusters
530 # Main functions for the different types of clusters
528 #-----------------------------------------------------------------------------
531 #-----------------------------------------------------------------------------
529
532
530 # TODO:
533 # TODO:
531 # The logic in these codes should be moved into classes like LocalCluster
534 # The logic in these codes should be moved into classes like LocalCluster
532 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
535 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
533 # The main functions should then just parse the command line arguments, create
536 # The main functions should then just parse the command line arguments, create
534 # the appropriate class and call a 'start' method.
537 # the appropriate class and call a 'start' method.
535
538
536
539
537 def check_security(args, cont_args):
540 def check_security(args, cont_args):
538 """Check to see if we should run with SSL support."""
541 """Check to see if we should run with SSL support."""
539 if (not args.x or not args.y) and not have_crypto:
542 if (not args.x or not args.y) and not have_crypto:
540 log.err("""
543 log.err("""
541 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
544 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
542 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
545 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
543 reactor.stop()
546 reactor.stop()
544 return False
547 return False
545 if args.x:
548 if args.x:
546 cont_args.append('-x')
549 cont_args.append('-x')
547 if args.y:
550 if args.y:
548 cont_args.append('-y')
551 cont_args.append('-y')
549 return True
552 return True
550
553
551
554
552 def check_reuse(args, cont_args):
555 def check_reuse(args, cont_args):
553 """Check to see if we should try to resuse FURL files."""
556 """Check to see if we should try to resuse FURL files."""
554 if args.r:
557 if args.r:
555 cont_args.append('-r')
558 cont_args.append('-r')
556 if args.client_port == 0 or args.engine_port == 0:
559 if args.client_port == 0 or args.engine_port == 0:
557 log.err("""
560 log.err("""
558 To reuse FURL files, you must also set the client and engine ports using
561 To reuse FURL files, you must also set the client and engine ports using
559 the --client-port and --engine-port options.""")
562 the --client-port and --engine-port options.""")
560 reactor.stop()
563 reactor.stop()
561 return False
564 return False
562 cont_args.append('--client-port=%i' % args.client_port)
565 cont_args.append('--client-port=%i' % args.client_port)
563 cont_args.append('--engine-port=%i' % args.engine_port)
566 cont_args.append('--engine-port=%i' % args.engine_port)
564 return True
567 return True
565
568
566
569
567 def _err_and_stop(f):
570 def _err_and_stop(f):
568 """Errback to log a failure and halt the reactor on a fatal error."""
571 """Errback to log a failure and halt the reactor on a fatal error."""
569 log.err(f)
572 log.err(f)
570 reactor.stop()
573 reactor.stop()
571
574
572
575
573 def _delay_start(cont_pid, start_engines, furl_file, reuse):
576 def _delay_start(cont_pid, start_engines, furl_file, reuse):
574 """Wait for controller to create FURL files and the start the engines."""
577 """Wait for controller to create FURL files and the start the engines."""
575 if not reuse:
578 if not reuse:
576 if os.path.isfile(furl_file):
579 if os.path.isfile(furl_file):
577 os.unlink(furl_file)
580 os.unlink(furl_file)
578 log.msg('Waiting for controller to finish starting...')
581 log.msg('Waiting for controller to finish starting...')
579 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
582 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
580 d.addCallback(lambda _: log.msg('Controller started'))
583 d.addCallback(lambda _: log.msg('Controller started'))
581 d.addCallback(lambda _: start_engines(cont_pid))
584 d.addCallback(lambda _: start_engines(cont_pid))
582 return d
585 return d
583
586
584
587
585 def main_local(args):
588 def main_local(args):
586 cont_args = []
589 cont_args = []
587 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
590 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
588
591
589 # Check security settings before proceeding
592 # Check security settings before proceeding
590 if not check_security(args, cont_args):
593 if not check_security(args, cont_args):
591 return
594 return
592
595
593 # See if we are reusing FURL files
596 # See if we are reusing FURL files
594 if not check_reuse(args, cont_args):
597 if not check_reuse(args, cont_args):
595 return
598 return
596
599
597 cl = ControllerLauncher(extra_args=cont_args)
600 cl = ControllerLauncher(extra_args=cont_args)
598 dstart = cl.start()
601 dstart = cl.start()
599 def start_engines(cont_pid):
602 def start_engines(cont_pid):
600 engine_args = []
603 engine_args = []
601 engine_args.append('--logfile=%s' % \
604 engine_args.append('--logfile=%s' % \
602 pjoin(args.logdir,'ipengine%s-' % cont_pid))
605 pjoin(args.logdir,'ipengine%s-' % cont_pid))
603 eset = LocalEngineSet(extra_args=engine_args)
606 eset = LocalEngineSet(extra_args=engine_args)
604 def shutdown(signum, frame):
607 def shutdown(signum, frame):
605 log.msg('Stopping local cluster')
608 log.msg('Stopping local cluster')
606 # We are still playing with the times here, but these seem
609 # We are still playing with the times here, but these seem
607 # to be reliable in allowing everything to exit cleanly.
610 # to be reliable in allowing everything to exit cleanly.
608 eset.interrupt_then_kill(0.5)
611 eset.interrupt_then_kill(0.5)
609 cl.interrupt_then_kill(0.5)
612 cl.interrupt_then_kill(0.5)
610 reactor.callLater(1.0, reactor.stop)
613 reactor.callLater(1.0, reactor.stop)
611 signal.signal(signal.SIGINT,shutdown)
614 signal.signal(signal.SIGINT,shutdown)
612 d = eset.start(args.n)
615 d = eset.start(args.n)
613 return d
616 return d
614 config = kernel_config_manager.get_config_obj()
617 config = kernel_config_manager.get_config_obj()
615 furl_file = config['controller']['engine_furl_file']
618 furl_file = config['controller']['engine_furl_file']
616 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
619 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
617 dstart.addErrback(_err_and_stop)
620 dstart.addErrback(_err_and_stop)
618
621
619
622
620 def main_mpi(args):
623 def main_mpi(args):
621 cont_args = []
624 cont_args = []
622 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
625 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
623
626
624 # Check security settings before proceeding
627 # Check security settings before proceeding
625 if not check_security(args, cont_args):
628 if not check_security(args, cont_args):
626 return
629 return
627
630
628 # See if we are reusing FURL files
631 # See if we are reusing FURL files
629 if not check_reuse(args, cont_args):
632 if not check_reuse(args, cont_args):
630 return
633 return
631
634
632 cl = ControllerLauncher(extra_args=cont_args)
635 cl = ControllerLauncher(extra_args=cont_args)
633 dstart = cl.start()
636 dstart = cl.start()
634 def start_engines(cont_pid):
637 def start_engines(cont_pid):
635 raw_args = [args.cmd]
638 raw_args = [args.cmd]
636 raw_args.extend(['-n',str(args.n)])
639 raw_args.extend(['-n',str(args.n)])
637 raw_args.append('ipengine')
640 raw_args.append('ipengine')
638 raw_args.append('-l')
641 raw_args.append('-l')
639 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
642 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
640 if args.mpi:
643 if args.mpi:
641 raw_args.append('--mpi=%s' % args.mpi)
644 raw_args.append('--mpi=%s' % args.mpi)
642 eset = ProcessLauncher(raw_args)
645 eset = ProcessLauncher(raw_args)
643 def shutdown(signum, frame):
646 def shutdown(signum, frame):
644 log.msg('Stopping local cluster')
647 log.msg('Stopping local cluster')
645 # We are still playing with the times here, but these seem
648 # We are still playing with the times here, but these seem
646 # to be reliable in allowing everything to exit cleanly.
649 # to be reliable in allowing everything to exit cleanly.
647 eset.interrupt_then_kill(1.0)
650 eset.interrupt_then_kill(1.0)
648 cl.interrupt_then_kill(1.0)
651 cl.interrupt_then_kill(1.0)
649 reactor.callLater(2.0, reactor.stop)
652 reactor.callLater(2.0, reactor.stop)
650 signal.signal(signal.SIGINT,shutdown)
653 signal.signal(signal.SIGINT,shutdown)
651 d = eset.start()
654 d = eset.start()
652 return d
655 return d
653 config = kernel_config_manager.get_config_obj()
656 config = kernel_config_manager.get_config_obj()
654 furl_file = config['controller']['engine_furl_file']
657 furl_file = config['controller']['engine_furl_file']
655 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
658 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
656 dstart.addErrback(_err_and_stop)
659 dstart.addErrback(_err_and_stop)
657
660
658
661
659 def main_pbs(args):
662 def main_pbs(args):
660 cont_args = []
663 cont_args = []
661 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
664 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
662
665
663 # Check security settings before proceeding
666 # Check security settings before proceeding
664 if not check_security(args, cont_args):
667 if not check_security(args, cont_args):
665 return
668 return
666
669
667 # See if we are reusing FURL files
670 # See if we are reusing FURL files
668 if not check_reuse(args, cont_args):
671 if not check_reuse(args, cont_args):
669 return
672 return
670
673
671 if args.pbsscript and not os.path.isfile(args.pbsscript):
674 if args.pbsscript and not os.path.isfile(args.pbsscript):
672 log.err('PBS script does not exist: %s' % args.pbsscript)
675 log.err('PBS script does not exist: %s' % args.pbsscript)
673 return
676 return
674
677
675 cl = ControllerLauncher(extra_args=cont_args)
678 cl = ControllerLauncher(extra_args=cont_args)
676 dstart = cl.start()
679 dstart = cl.start()
677 def start_engines(r):
680 def start_engines(r):
678 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
681 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
679 def shutdown(signum, frame):
682 def shutdown(signum, frame):
680 log.msg('Stopping PBS cluster')
683 log.msg('Stopping PBS cluster')
681 d = pbs_set.kill()
684 d = pbs_set.kill()
682 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
685 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
683 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
686 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
684 signal.signal(signal.SIGINT,shutdown)
687 signal.signal(signal.SIGINT,shutdown)
685 d = pbs_set.start(args.n)
688 d = pbs_set.start(args.n)
686 return d
689 return d
687 config = kernel_config_manager.get_config_obj()
690 config = kernel_config_manager.get_config_obj()
688 furl_file = config['controller']['engine_furl_file']
691 furl_file = config['controller']['engine_furl_file']
689 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
692 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
690 dstart.addErrback(_err_and_stop)
693 dstart.addErrback(_err_and_stop)
691
694
692 def main_sge(args):
695 def main_sge(args):
693 cont_args = []
696 cont_args = []
694 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
697 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
695
698
696 # Check security settings before proceeding
699 # Check security settings before proceeding
697 if not check_security(args, cont_args):
700 if not check_security(args, cont_args):
698 return
701 return
699
702
700 # See if we are reusing FURL files
703 # See if we are reusing FURL files
701 if not check_reuse(args, cont_args):
704 if not check_reuse(args, cont_args):
702 return
705 return
703
706
704 if args.sgescript and not os.path.isfile(args.sgescript):
707 if args.sgescript and not os.path.isfile(args.sgescript):
705 log.err('SGE script does not exist: %s' % args.sgescript)
708 log.err('SGE script does not exist: %s' % args.sgescript)
706 return
709 return
707
710
708 cl = ControllerLauncher(extra_args=cont_args)
711 cl = ControllerLauncher(extra_args=cont_args)
709 dstart = cl.start()
712 dstart = cl.start()
710 def start_engines(r):
713 def start_engines(r):
711 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
714 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
712 def shutdown(signum, frame):
715 def shutdown(signum, frame):
713 log.msg('Stopping sge cluster')
716 log.msg('Stopping sge cluster')
714 d = sge_set.kill()
717 d = sge_set.kill()
715 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
718 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
716 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
719 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
717 signal.signal(signal.SIGINT,shutdown)
720 signal.signal(signal.SIGINT,shutdown)
718 d = sge_set.start(args.n)
721 d = sge_set.start(args.n)
719 return d
722 return d
720 config = kernel_config_manager.get_config_obj()
723 config = kernel_config_manager.get_config_obj()
721 furl_file = config['controller']['engine_furl_file']
724 furl_file = config['controller']['engine_furl_file']
722 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
725 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
723 dstart.addErrback(_err_and_stop)
726 dstart.addErrback(_err_and_stop)
724
727
725 def main_lsf(args):
728 def main_lsf(args):
726 cont_args = []
729 cont_args = []
727 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
730 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
728
731
729 # Check security settings before proceeding
732 # Check security settings before proceeding
730 if not check_security(args, cont_args):
733 if not check_security(args, cont_args):
731 return
734 return
732
735
733 # See if we are reusing FURL files
736 # See if we are reusing FURL files
734 if not check_reuse(args, cont_args):
737 if not check_reuse(args, cont_args):
735 return
738 return
736
739
737 if args.lsfscript and not os.path.isfile(args.lsfscript):
740 if args.lsfscript and not os.path.isfile(args.lsfscript):
738 log.err('LSF script does not exist: %s' % args.lsfscript)
741 log.err('LSF script does not exist: %s' % args.lsfscript)
739 return
742 return
740
743
741 cl = ControllerLauncher(extra_args=cont_args)
744 cl = ControllerLauncher(extra_args=cont_args)
742 dstart = cl.start()
745 dstart = cl.start()
743 def start_engines(r):
746 def start_engines(r):
744 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
747 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
745 def shutdown(signum, frame):
748 def shutdown(signum, frame):
746 log.msg('Stopping LSF cluster')
749 log.msg('Stopping LSF cluster')
747 d = lsf_set.kill()
750 d = lsf_set.kill()
748 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
751 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
749 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
752 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
750 signal.signal(signal.SIGINT,shutdown)
753 signal.signal(signal.SIGINT,shutdown)
751 d = lsf_set.start(args.n)
754 d = lsf_set.start(args.n)
752 return d
755 return d
753 config = kernel_config_manager.get_config_obj()
756 config = kernel_config_manager.get_config_obj()
754 furl_file = config['controller']['engine_furl_file']
757 furl_file = config['controller']['engine_furl_file']
755 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
758 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
756 dstart.addErrback(_err_and_stop)
759 dstart.addErrback(_err_and_stop)
757
760
758
761
759 def main_ssh(args):
762 def main_ssh(args):
760 """Start a controller on localhost and engines using ssh.
763 """Start a controller on localhost and engines using ssh.
761
764
762 Your clusterfile should look like::
765 Your clusterfile should look like::
763
766
764 send_furl = False # True, if you want
767 send_furl = False # True, if you want
765 engines = {
768 engines = {
766 'engine_host1' : engine_count,
769 'engine_host1' : engine_count,
767 'engine_host2' : engine_count2
770 'engine_host2' : engine_count2
768 }
771 }
769 """
772 """
770 clusterfile = {}
773 clusterfile = {}
771 execfile(args.clusterfile, clusterfile)
774 execfile(args.clusterfile, clusterfile)
772 if not clusterfile.has_key('send_furl'):
775 if not clusterfile.has_key('send_furl'):
773 clusterfile['send_furl'] = False
776 clusterfile['send_furl'] = False
774
777
775 cont_args = []
778 cont_args = []
776 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
779 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
777
780
778 # Check security settings before proceeding
781 # Check security settings before proceeding
779 if not check_security(args, cont_args):
782 if not check_security(args, cont_args):
780 return
783 return
781
784
782 # See if we are reusing FURL files
785 # See if we are reusing FURL files
783 if not check_reuse(args, cont_args):
786 if not check_reuse(args, cont_args):
784 return
787 return
785
788
786 cl = ControllerLauncher(extra_args=cont_args)
789 cl = ControllerLauncher(extra_args=cont_args)
787 dstart = cl.start()
790 dstart = cl.start()
788 def start_engines(cont_pid):
791 def start_engines(cont_pid):
789 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
792 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
790 def shutdown(signum, frame):
793 def shutdown(signum, frame):
791 d = ssh_set.kill()
794 d = ssh_set.kill()
792 cl.interrupt_then_kill(1.0)
795 cl.interrupt_then_kill(1.0)
793 reactor.callLater(2.0, reactor.stop)
796 reactor.callLater(2.0, reactor.stop)
794 signal.signal(signal.SIGINT,shutdown)
797 signal.signal(signal.SIGINT,shutdown)
795 d = ssh_set.start(clusterfile['send_furl'])
798 d = ssh_set.start(clusterfile['send_furl'])
796 return d
799 return d
797 config = kernel_config_manager.get_config_obj()
800 config = kernel_config_manager.get_config_obj()
798 furl_file = config['controller']['engine_furl_file']
801 furl_file = config['controller']['engine_furl_file']
799 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
802 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
800 dstart.addErrback(_err_and_stop)
803 dstart.addErrback(_err_and_stop)
801
804
802
805
803 def get_args():
806 def get_args():
804 base_parser = argparse.ArgumentParser(add_help=False)
807 base_parser = argparse.ArgumentParser(add_help=False)
805 base_parser.add_argument(
808 base_parser.add_argument(
806 '-r',
809 '-r',
807 action='store_true',
810 action='store_true',
808 dest='r',
811 dest='r',
809 help='try to reuse FURL files. Use with --client-port and --engine-port'
812 help='try to reuse FURL files. Use with --client-port and --engine-port'
810 )
813 )
811 base_parser.add_argument(
814 base_parser.add_argument(
812 '--client-port',
815 '--client-port',
813 type=int,
816 type=int,
814 dest='client_port',
817 dest='client_port',
815 help='the port the controller will listen on for client connections',
818 help='the port the controller will listen on for client connections',
816 default=0
819 default=0
817 )
820 )
818 base_parser.add_argument(
821 base_parser.add_argument(
819 '--engine-port',
822 '--engine-port',
820 type=int,
823 type=int,
821 dest='engine_port',
824 dest='engine_port',
822 help='the port the controller will listen on for engine connections',
825 help='the port the controller will listen on for engine connections',
823 default=0
826 default=0
824 )
827 )
825 base_parser.add_argument(
828 base_parser.add_argument(
826 '-x',
829 '-x',
827 action='store_true',
830 action='store_true',
828 dest='x',
831 dest='x',
829 help='turn off client security'
832 help='turn off client security'
830 )
833 )
831 base_parser.add_argument(
834 base_parser.add_argument(
832 '-y',
835 '-y',
833 action='store_true',
836 action='store_true',
834 dest='y',
837 dest='y',
835 help='turn off engine security'
838 help='turn off engine security'
836 )
839 )
837 base_parser.add_argument(
840 base_parser.add_argument(
838 "--logdir",
841 "--logdir",
839 type=str,
842 type=str,
840 dest="logdir",
843 dest="logdir",
841 help="directory to put log files (default=$IPYTHONDIR/log)",
844 help="directory to put log files (default=$IPYTHONDIR/log)",
842 default=pjoin(get_ipython_dir(),'log')
845 default=pjoin(get_ipython_dir(),'log')
843 )
846 )
844 base_parser.add_argument(
847 base_parser.add_argument(
845 "-n",
848 "-n",
846 "--num",
849 "--num",
847 type=int,
850 type=int,
848 dest="n",
851 dest="n",
849 default=2,
852 default=2,
850 help="the number of engines to start"
853 help="the number of engines to start"
851 )
854 )
852
855
853 parser = argparse.ArgumentParser(
856 parser = argparse.ArgumentParser(
854 description='IPython cluster startup. This starts a controller and\
857 description='IPython cluster startup. This starts a controller and\
855 engines using various approaches. Use the IPYTHONDIR environment\
858 engines using various approaches. Use the IPYTHONDIR environment\
856 variable to change your IPython directory from the default of\
859 variable to change your IPython directory from the default of\
857 .ipython or _ipython. The log and security subdirectories of your\
860 .ipython or _ipython. The log and security subdirectories of your\
858 IPython directory will be used by this script for log files and\
861 IPython directory will be used by this script for log files and\
859 security files.'
862 security files.'
860 )
863 )
861 subparsers = parser.add_subparsers(
864 subparsers = parser.add_subparsers(
862 help='available cluster types. For help, do "ipcluster TYPE --help"')
865 help='available cluster types. For help, do "ipcluster TYPE --help"')
863
866
864 parser_local = subparsers.add_parser(
867 parser_local = subparsers.add_parser(
865 'local',
868 'local',
866 help='run a local cluster',
869 help='run a local cluster',
867 parents=[base_parser]
870 parents=[base_parser]
868 )
871 )
869 parser_local.set_defaults(func=main_local)
872 parser_local.set_defaults(func=main_local)
870
873
871 parser_mpirun = subparsers.add_parser(
874 parser_mpirun = subparsers.add_parser(
872 'mpirun',
875 'mpirun',
873 help='run a cluster using mpirun (mpiexec also works)',
876 help='run a cluster using mpirun (mpiexec also works)',
874 parents=[base_parser]
877 parents=[base_parser]
875 )
878 )
876 parser_mpirun.add_argument(
879 parser_mpirun.add_argument(
877 "--mpi",
880 "--mpi",
878 type=str,
881 type=str,
879 dest="mpi", # Don't put a default here to allow no MPI support
882 dest="mpi", # Don't put a default here to allow no MPI support
880 help="how to call MPI_Init (default=mpi4py)"
883 help="how to call MPI_Init (default=mpi4py)"
881 )
884 )
882 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
885 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
883
886
884 parser_mpiexec = subparsers.add_parser(
887 parser_mpiexec = subparsers.add_parser(
885 'mpiexec',
888 'mpiexec',
886 help='run a cluster using mpiexec (mpirun also works)',
889 help='run a cluster using mpiexec (mpirun also works)',
887 parents=[base_parser]
890 parents=[base_parser]
888 )
891 )
889 parser_mpiexec.add_argument(
892 parser_mpiexec.add_argument(
890 "--mpi",
893 "--mpi",
891 type=str,
894 type=str,
892 dest="mpi", # Don't put a default here to allow no MPI support
895 dest="mpi", # Don't put a default here to allow no MPI support
893 help="how to call MPI_Init (default=mpi4py)"
896 help="how to call MPI_Init (default=mpi4py)"
894 )
897 )
895 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
898 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
896
899
897 parser_pbs = subparsers.add_parser(
900 parser_pbs = subparsers.add_parser(
898 'pbs',
901 'pbs',
899 help='run a pbs cluster',
902 help='run a pbs cluster',
900 parents=[base_parser]
903 parents=[base_parser]
901 )
904 )
902 parser_pbs.add_argument(
905 parser_pbs.add_argument(
903 '-s',
906 '-s',
904 '--pbs-script',
907 '--pbs-script',
905 type=str,
908 type=str,
906 dest='pbsscript',
909 dest='pbsscript',
907 help='PBS script template',
910 help='PBS script template',
908 default=''
911 default=''
909 )
912 )
910 parser_pbs.add_argument(
913 parser_pbs.add_argument(
911 '-q',
914 '-q',
912 '--queue',
915 '--queue',
913 type=str,
916 type=str,
914 dest='pbsqueue',
917 dest='pbsqueue',
915 help='PBS queue to use when starting the engines',
918 help='PBS queue to use when starting the engines',
916 default=None,
919 default=None,
917 )
920 )
918 parser_pbs.set_defaults(func=main_pbs)
921 parser_pbs.set_defaults(func=main_pbs)
919
922
920 parser_sge = subparsers.add_parser(
923 parser_sge = subparsers.add_parser(
921 'sge',
924 'sge',
922 help='run an sge cluster',
925 help='run an sge cluster',
923 parents=[base_parser]
926 parents=[base_parser]
924 )
927 )
925 parser_sge.add_argument(
928 parser_sge.add_argument(
926 '-s',
929 '-s',
927 '--sge-script',
930 '--sge-script',
928 type=str,
931 type=str,
929 dest='sgescript',
932 dest='sgescript',
930 help='SGE script template',
933 help='SGE script template',
931 default='' # SGEEngineSet will create one if not specified
934 default='' # SGEEngineSet will create one if not specified
932 )
935 )
933 parser_sge.add_argument(
936 parser_sge.add_argument(
934 '-q',
937 '-q',
935 '--queue',
938 '--queue',
936 type=str,
939 type=str,
937 dest='sgequeue',
940 dest='sgequeue',
938 help='SGE queue to use when starting the engines',
941 help='SGE queue to use when starting the engines',
939 default=None,
942 default=None,
940 )
943 )
941 parser_sge.set_defaults(func=main_sge)
944 parser_sge.set_defaults(func=main_sge)
942
945
943 parser_lsf = subparsers.add_parser(
946 parser_lsf = subparsers.add_parser(
944 'lsf',
947 'lsf',
945 help='run an lsf cluster',
948 help='run an lsf cluster',
946 parents=[base_parser]
949 parents=[base_parser]
947 )
950 )
948
951
949 parser_lsf.add_argument(
952 parser_lsf.add_argument(
950 '-s',
953 '-s',
951 '--lsf-script',
954 '--lsf-script',
952 type=str,
955 type=str,
953 dest='lsfscript',
956 dest='lsfscript',
954 help='LSF script template',
957 help='LSF script template',
955 default='' # LSFEngineSet will create one if not specified
958 default='' # LSFEngineSet will create one if not specified
956 )
959 )
957
960
958 parser_lsf.add_argument(
961 parser_lsf.add_argument(
959 '-q',
962 '-q',
960 '--queue',
963 '--queue',
961 type=str,
964 type=str,
962 dest='lsfqueue',
965 dest='lsfqueue',
963 help='LSF queue to use when starting the engines',
966 help='LSF queue to use when starting the engines',
964 default=None,
967 default=None,
965 )
968 )
966 parser_lsf.set_defaults(func=main_lsf)
969 parser_lsf.set_defaults(func=main_lsf)
967
970
968 parser_ssh = subparsers.add_parser(
971 parser_ssh = subparsers.add_parser(
969 'ssh',
972 'ssh',
970 help='run a cluster using ssh, should have ssh-keys setup',
973 help='run a cluster using ssh, should have ssh-keys setup',
971 parents=[base_parser]
974 parents=[base_parser]
972 )
975 )
973 parser_ssh.add_argument(
976 parser_ssh.add_argument(
974 '--clusterfile',
977 '--clusterfile',
975 type=str,
978 type=str,
976 dest='clusterfile',
979 dest='clusterfile',
977 help='python file describing the cluster',
980 help='python file describing the cluster',
978 default='clusterfile.py',
981 default='clusterfile.py',
979 )
982 )
980 parser_ssh.add_argument(
983 parser_ssh.add_argument(
981 '--sshx',
984 '--sshx',
982 type=str,
985 type=str,
983 dest='sshx',
986 dest='sshx',
984 help='sshx launcher helper'
987 help='sshx launcher helper'
985 )
988 )
986 parser_ssh.set_defaults(func=main_ssh)
989 parser_ssh.set_defaults(func=main_ssh)
987
990
988 args = parser.parse_args()
991 args = parser.parse_args()
989 return args
992 return args
990
993
991 def main():
994 def main():
992 args = get_args()
995 args = get_args()
993 reactor.callWhenRunning(args.func, args)
996 reactor.callWhenRunning(args.func, args)
994 log.startLogging(sys.stdout)
997 log.startLogging(sys.stdout)
995 reactor.run()
998 reactor.run()
996
999
997 if __name__ == '__main__':
1000 if __name__ == '__main__':
998 main()
1001 main()
General Comments 0
You need to be logged in to leave comments. Login now