##// END OF EJS Templates
add --queue to ipcluster's SGE/PBS/LSF subcmds
Justin Riley -
Show More
@@ -1,948 +1,998 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 import tempfile
22 pjoin = os.path.join
22 pjoin = os.path.join
23
23
24 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import failure, log
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.external import Itpl
31 from IPython.external import Itpl
32 from IPython.genutils import (
32 from IPython.genutils import (
33 get_ipython_dir,
33 get_ipython_dir,
34 get_log_dir,
34 get_log_dir,
35 get_security_dir,
35 get_security_dir,
36 num_cpus
36 num_cpus
37 )
37 )
38 from IPython.kernel.fcutil import have_crypto
38 from IPython.kernel.fcutil import have_crypto
39
39
40 # Create various ipython directories if they don't exist.
40 # Create various ipython directories if they don't exist.
41 # This must be done before IPython.kernel.config is imported.
41 # This must be done before IPython.kernel.config is imported.
42 from IPython.iplib import user_setup
42 from IPython.iplib import user_setup
43 if os.name == 'posix':
43 if os.name == 'posix':
44 rc_suffix = ''
44 rc_suffix = ''
45 else:
45 else:
46 rc_suffix = '.ini'
46 rc_suffix = '.ini'
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 get_log_dir()
48 get_log_dir()
49 get_security_dir()
49 get_security_dir()
50
50
51 from IPython.kernel.config import config_manager as kernel_config_manager
51 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.error import SecurityError, FileTimeoutError
52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.fcutil import have_crypto
53 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.util import printer
55 from IPython.kernel.util import printer
56
56
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58 # General process handling code
58 # General process handling code
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60
60
61
61
62 class ProcessStateError(Exception):
62 class ProcessStateError(Exception):
63 pass
63 pass
64
64
65 class UnknownStatus(Exception):
65 class UnknownStatus(Exception):
66 pass
66 pass
67
67
68 class LauncherProcessProtocol(ProcessProtocol):
68 class LauncherProcessProtocol(ProcessProtocol):
69 """
69 """
70 A ProcessProtocol to go with the ProcessLauncher.
70 A ProcessProtocol to go with the ProcessLauncher.
71 """
71 """
72 def __init__(self, process_launcher):
72 def __init__(self, process_launcher):
73 self.process_launcher = process_launcher
73 self.process_launcher = process_launcher
74
74
75 def connectionMade(self):
75 def connectionMade(self):
76 self.process_launcher.fire_start_deferred(self.transport.pid)
76 self.process_launcher.fire_start_deferred(self.transport.pid)
77
77
78 def processEnded(self, status):
78 def processEnded(self, status):
79 value = status.value
79 value = status.value
80 if isinstance(value, ProcessDone):
80 if isinstance(value, ProcessDone):
81 self.process_launcher.fire_stop_deferred(0)
81 self.process_launcher.fire_stop_deferred(0)
82 elif isinstance(value, ProcessTerminated):
82 elif isinstance(value, ProcessTerminated):
83 self.process_launcher.fire_stop_deferred(
83 self.process_launcher.fire_stop_deferred(
84 {'exit_code':value.exitCode,
84 {'exit_code':value.exitCode,
85 'signal':value.signal,
85 'signal':value.signal,
86 'status':value.status
86 'status':value.status
87 }
87 }
88 )
88 )
89 else:
89 else:
90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91
91
92 def outReceived(self, data):
92 def outReceived(self, data):
93 log.msg(data)
93 log.msg(data)
94
94
95 def errReceived(self, data):
95 def errReceived(self, data):
96 log.err(data)
96 log.err(data)
97
97
98 class ProcessLauncher(object):
98 class ProcessLauncher(object):
99 """
99 """
100 Start and stop an external process in an asynchronous manner.
100 Start and stop an external process in an asynchronous manner.
101
101
102 Currently this uses deferreds to notify other parties of process state
102 Currently this uses deferreds to notify other parties of process state
103 changes. This is an awkward design and should be moved to using
103 changes. This is an awkward design and should be moved to using
104 a formal NotificationCenter.
104 a formal NotificationCenter.
105 """
105 """
106 def __init__(self, cmd_and_args):
106 def __init__(self, cmd_and_args):
107 self.cmd = cmd_and_args[0]
107 self.cmd = cmd_and_args[0]
108 self.args = cmd_and_args
108 self.args = cmd_and_args
109 self._reset()
109 self._reset()
110
110
111 def _reset(self):
111 def _reset(self):
112 self.process_protocol = None
112 self.process_protocol = None
113 self.pid = None
113 self.pid = None
114 self.start_deferred = None
114 self.start_deferred = None
115 self.stop_deferreds = []
115 self.stop_deferreds = []
116 self.state = 'before' # before, running, or after
116 self.state = 'before' # before, running, or after
117
117
118 @property
118 @property
119 def running(self):
119 def running(self):
120 if self.state == 'running':
120 if self.state == 'running':
121 return True
121 return True
122 else:
122 else:
123 return False
123 return False
124
124
125 def fire_start_deferred(self, pid):
125 def fire_start_deferred(self, pid):
126 self.pid = pid
126 self.pid = pid
127 self.state = 'running'
127 self.state = 'running'
128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 self.start_deferred.callback(pid)
129 self.start_deferred.callback(pid)
130
130
131 def start(self):
131 def start(self):
132 if self.state == 'before':
132 if self.state == 'before':
133 self.process_protocol = LauncherProcessProtocol(self)
133 self.process_protocol = LauncherProcessProtocol(self)
134 self.start_deferred = defer.Deferred()
134 self.start_deferred = defer.Deferred()
135 self.process_transport = reactor.spawnProcess(
135 self.process_transport = reactor.spawnProcess(
136 self.process_protocol,
136 self.process_protocol,
137 self.cmd,
137 self.cmd,
138 self.args,
138 self.args,
139 env=os.environ
139 env=os.environ
140 )
140 )
141 return self.start_deferred
141 return self.start_deferred
142 else:
142 else:
143 s = 'the process has already been started and has state: %r' % \
143 s = 'the process has already been started and has state: %r' % \
144 self.state
144 self.state
145 return defer.fail(ProcessStateError(s))
145 return defer.fail(ProcessStateError(s))
146
146
147 def get_stop_deferred(self):
147 def get_stop_deferred(self):
148 if self.state == 'running' or self.state == 'before':
148 if self.state == 'running' or self.state == 'before':
149 d = defer.Deferred()
149 d = defer.Deferred()
150 self.stop_deferreds.append(d)
150 self.stop_deferreds.append(d)
151 return d
151 return d
152 else:
152 else:
153 s = 'this process is already complete'
153 s = 'this process is already complete'
154 return defer.fail(ProcessStateError(s))
154 return defer.fail(ProcessStateError(s))
155
155
156 def fire_stop_deferred(self, exit_code):
156 def fire_stop_deferred(self, exit_code):
157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 self.state = 'after'
158 self.state = 'after'
159 for d in self.stop_deferreds:
159 for d in self.stop_deferreds:
160 d.callback(exit_code)
160 d.callback(exit_code)
161
161
162 def signal(self, sig):
162 def signal(self, sig):
163 """
163 """
164 Send a signal to the process.
164 Send a signal to the process.
165
165
166 The argument sig can be ('KILL','INT', etc.) or any signal number.
166 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 """
167 """
168 if self.state == 'running':
168 if self.state == 'running':
169 self.process_transport.signalProcess(sig)
169 self.process_transport.signalProcess(sig)
170
170
171 # def __del__(self):
171 # def __del__(self):
172 # self.signal('KILL')
172 # self.signal('KILL')
173
173
174 def interrupt_then_kill(self, delay=1.0):
174 def interrupt_then_kill(self, delay=1.0):
175 self.signal('INT')
175 self.signal('INT')
176 reactor.callLater(delay, self.signal, 'KILL')
176 reactor.callLater(delay, self.signal, 'KILL')
177
177
178
178
179 #-----------------------------------------------------------------------------
179 #-----------------------------------------------------------------------------
180 # Code for launching controller and engines
180 # Code for launching controller and engines
181 #-----------------------------------------------------------------------------
181 #-----------------------------------------------------------------------------
182
182
183
183
184 class ControllerLauncher(ProcessLauncher):
184 class ControllerLauncher(ProcessLauncher):
185
185
186 def __init__(self, extra_args=None):
186 def __init__(self, extra_args=None):
187 if sys.platform == 'win32':
187 if sys.platform == 'win32':
188 # This logic is needed because the ipcontroller script doesn't
188 # This logic is needed because the ipcontroller script doesn't
189 # always get installed in the same way or in the same location.
189 # always get installed in the same way or in the same location.
190 from IPython.kernel.scripts import ipcontroller
190 from IPython.kernel.scripts import ipcontroller
191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 # The -u option here turns on unbuffered output, which is required
192 # The -u option here turns on unbuffered output, which is required
193 # on Win32 to prevent wierd conflict and problems with Twisted.
193 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # Also, use sys.executable to make sure we are picking up the
194 # Also, use sys.executable to make sure we are picking up the
195 # right python exe.
195 # right python exe.
196 args = [sys.executable, '-u', script_location]
196 args = [sys.executable, '-u', script_location]
197 else:
197 else:
198 args = ['ipcontroller']
198 args = ['ipcontroller']
199 self.extra_args = extra_args
199 self.extra_args = extra_args
200 if extra_args is not None:
200 if extra_args is not None:
201 args.extend(extra_args)
201 args.extend(extra_args)
202
202
203 ProcessLauncher.__init__(self, args)
203 ProcessLauncher.__init__(self, args)
204
204
205
205
206 class EngineLauncher(ProcessLauncher):
206 class EngineLauncher(ProcessLauncher):
207
207
208 def __init__(self, extra_args=None):
208 def __init__(self, extra_args=None):
209 if sys.platform == 'win32':
209 if sys.platform == 'win32':
210 # This logic is needed because the ipcontroller script doesn't
210 # This logic is needed because the ipcontroller script doesn't
211 # always get installed in the same way or in the same location.
211 # always get installed in the same way or in the same location.
212 from IPython.kernel.scripts import ipengine
212 from IPython.kernel.scripts import ipengine
213 script_location = ipengine.__file__.replace('.pyc', '.py')
213 script_location = ipengine.__file__.replace('.pyc', '.py')
214 # The -u option here turns on unbuffered output, which is required
214 # The -u option here turns on unbuffered output, which is required
215 # on Win32 to prevent wierd conflict and problems with Twisted.
215 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # Also, use sys.executable to make sure we are picking up the
216 # Also, use sys.executable to make sure we are picking up the
217 # right python exe.
217 # right python exe.
218 args = [sys.executable, '-u', script_location]
218 args = [sys.executable, '-u', script_location]
219 else:
219 else:
220 args = ['ipengine']
220 args = ['ipengine']
221 self.extra_args = extra_args
221 self.extra_args = extra_args
222 if extra_args is not None:
222 if extra_args is not None:
223 args.extend(extra_args)
223 args.extend(extra_args)
224
224
225 ProcessLauncher.__init__(self, args)
225 ProcessLauncher.__init__(self, args)
226
226
227
227
228 class LocalEngineSet(object):
228 class LocalEngineSet(object):
229
229
230 def __init__(self, extra_args=None):
230 def __init__(self, extra_args=None):
231 self.extra_args = extra_args
231 self.extra_args = extra_args
232 self.launchers = []
232 self.launchers = []
233
233
234 def start(self, n):
234 def start(self, n):
235 dlist = []
235 dlist = []
236 for i in range(n):
236 for i in range(n):
237 print "starting engine:", i
237 print "starting engine:", i
238 el = EngineLauncher(extra_args=self.extra_args)
238 el = EngineLauncher(extra_args=self.extra_args)
239 d = el.start()
239 d = el.start()
240 self.launchers.append(el)
240 self.launchers.append(el)
241 dlist.append(d)
241 dlist.append(d)
242 dfinal = gatherBoth(dlist, consumeErrors=True)
242 dfinal = gatherBoth(dlist, consumeErrors=True)
243 dfinal.addCallback(self._handle_start)
243 dfinal.addCallback(self._handle_start)
244 return dfinal
244 return dfinal
245
245
246 def _handle_start(self, r):
246 def _handle_start(self, r):
247 log.msg('Engines started with pids: %r' % r)
247 log.msg('Engines started with pids: %r' % r)
248 return r
248 return r
249
249
250 def _handle_stop(self, r):
250 def _handle_stop(self, r):
251 log.msg('Engines received signal: %r' % r)
251 log.msg('Engines received signal: %r' % r)
252 return r
252 return r
253
253
254 def signal(self, sig):
254 def signal(self, sig):
255 dlist = []
255 dlist = []
256 for el in self.launchers:
256 for el in self.launchers:
257 d = el.get_stop_deferred()
257 d = el.get_stop_deferred()
258 dlist.append(d)
258 dlist.append(d)
259 el.signal(sig)
259 el.signal(sig)
260 dfinal = gatherBoth(dlist, consumeErrors=True)
260 dfinal = gatherBoth(dlist, consumeErrors=True)
261 dfinal.addCallback(self._handle_stop)
261 dfinal.addCallback(self._handle_stop)
262 return dfinal
262 return dfinal
263
263
264 def interrupt_then_kill(self, delay=1.0):
264 def interrupt_then_kill(self, delay=1.0):
265 dlist = []
265 dlist = []
266 for el in self.launchers:
266 for el in self.launchers:
267 d = el.get_stop_deferred()
267 d = el.get_stop_deferred()
268 dlist.append(d)
268 dlist.append(d)
269 el.interrupt_then_kill(delay)
269 el.interrupt_then_kill(delay)
270 dfinal = gatherBoth(dlist, consumeErrors=True)
270 dfinal = gatherBoth(dlist, consumeErrors=True)
271 dfinal.addCallback(self._handle_stop)
271 dfinal.addCallback(self._handle_stop)
272 return dfinal
272 return dfinal
273
273
274 class BatchEngineSet(object):
274 class BatchEngineSet(object):
275
275
276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 name = ''
277 name = ''
278 submit_command = ''
278 submit_command = ''
279 delete_command = ''
279 delete_command = ''
280 script_param_prefix = ''
281 job_id_regexp = ''
280 job_id_regexp = ''
282 job_array_regexp = ''
281 job_array_regexp = ''
282 job_array_template = ''
283 queue_regexp = ''
284 queue_template = ''
283 default_template = ''
285 default_template = ''
284
286
285 def __init__(self, template_file, **kwargs):
287 def __init__(self, template_file, queue, **kwargs):
286 self.template_file = template_file
288 self.template_file = template_file
289 self.queue = queue
287
290
288 def parse_job_id(self, output):
291 def parse_job_id(self, output):
289 m = re.search(self.job_id_regexp, output)
292 m = re.search(self.job_id_regexp, output)
290 if m is not None:
293 if m is not None:
291 job_id = m.group()
294 job_id = m.group()
292 else:
295 else:
293 raise Exception("job id couldn't be determined: %s" % output)
296 raise Exception("job id couldn't be determined: %s" % output)
294 self.job_id = job_id
297 self.job_id = job_id
295 log.msg('Job started with job id: %r' % job_id)
298 log.msg('Job started with job id: %r' % job_id)
296 return job_id
299 return job_id
297
300
298 def handle_error(self, f):
301 def handle_error(self, f):
299 f.printTraceback()
302 f.printTraceback()
300 f.raiseException()
303 f.raiseException()
301
304
302 def start(self, n):
305 def start(self, n):
303 log.msg("starting %d engines" % n)
306 log.msg("starting %d engines" % n)
304 self._temp_file = tempfile.NamedTemporaryFile()
307 self._temp_file = tempfile.NamedTemporaryFile()
305 regex = re.compile(self.job_array_regexp)
306 if self.template_file:
308 if self.template_file:
307 log.msg("Using %s script %s" % (self.name, self.template_file))
309 log.msg("Using %s script %s" % (self.name, self.template_file))
308 contents = open(self.template_file, 'r').read()
310 contents = open(self.template_file, 'r').read()
311 new_script = contents
312 regex = re.compile(self.job_array_regexp)
309 if not regex.search(contents):
313 if not regex.search(contents):
310 log.msg("adding job array settings to %s script" % self.name)
314 log.msg("adding job array settings to %s script" % self.name)
311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
315 new_script = self.job_array_template % n +'\n' + new_script
312 self._temp_file.write(contents)
316 print self.queue_regexp
317 regex = re.compile(self.queue_regexp)
318 print regex.search(contents)
319 if self.queue and not regex.search(contents):
320 log.msg("adding queue settings to %s script" % self.name)
321 new_script = self.queue_template % self.queue + '\n' + new_script
322 if new_script != contents:
323 self._temp_file.write(new_script)
313 self.template_file = self._temp_file.name
324 self.template_file = self._temp_file.name
314 else:
325 else:
326 default_script = self.default_template % n
327 if self.queue:
328 default_script = self.queue_template % self.queue + \
329 '\n' + default_script
315 log.msg("using default ipengine %s script: \n%s" %
330 log.msg("using default ipengine %s script: \n%s" %
316 (self.name, (self.default_template % n)))
331 (self.name, default_script))
317 self._temp_file.file.write(self.default_template % n)
332 self._temp_file.file.write(default_script)
318 self.template_file = self._temp_file.name
333 self.template_file = self._temp_file.name
319 self._temp_file.file.flush()
334 self._temp_file.file.flush()
320 d = getProcessOutput(self.submit_command,
335 d = getProcessOutput(self.submit_command,
321 [self.template_file],
336 [self.template_file],
322 env=os.environ)
337 env=os.environ)
323 d.addCallback(self.parse_job_id)
338 d.addCallback(self.parse_job_id)
324 d.addErrback(self.handle_error)
339 d.addErrback(self.handle_error)
325 return d
340 return d
326
341
327 def kill(self):
342 def kill(self):
328 d = getProcessOutput(self.delete_command,
343 d = getProcessOutput(self.delete_command,
329 [self.job_id],env=os.environ)
344 [self.job_id],env=os.environ)
330 return d
345 return d
331
346
332 class PBSEngineSet(BatchEngineSet):
347 class PBSEngineSet(BatchEngineSet):
333
348
334 name = 'PBS'
349 name = 'PBS'
335 submit_command = 'qsub'
350 submit_command = 'qsub'
336 delete_command = 'qdel'
351 delete_command = 'qdel'
337 script_param_prefix = "#PBS"
338 job_id_regexp = '\d+'
352 job_id_regexp = '\d+'
339 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
353 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
354 job_array_template = '#PBS -t 1-%d'
355 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
356 queue_template = '#PBS -q %s'
340 default_template="""#PBS -V
357 default_template="""#PBS -V
341 #PBS -t 1-%d
358 #PBS -t 1-%d
342 #PBS -N ipengine
359 #PBS -N ipengine
343 eid=$(($PBS_ARRAYID - 1))
360 eid=$(($PBS_ARRAYID - 1))
344 ipengine --logfile=ipengine${eid}.log
361 ipengine --logfile=ipengine${eid}.log
345 """
362 """
346
363
347 class SGEEngineSet(PBSEngineSet):
364 class SGEEngineSet(PBSEngineSet):
348
365
349 name = 'SGE'
366 name = 'SGE'
350 script_param_prefix = "#$"
351 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
367 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
368 job_array_template = '#$ -t 1-%d'
369 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
370 queue_template = '#$ -q %s'
352 default_template="""#$ -V
371 default_template="""#$ -V
353 #$ -t 1-%d
372 #$ -t 1-%d
354 #$ -N ipengine
373 #$ -N ipengine
355 eid=$(($SGE_TASK_ID - 1))
374 eid=$(($SGE_TASK_ID - 1))
356 ipengine --logfile=ipengine${eid}.log
375 ipengine --logfile=ipengine${eid}.log
357 """
376 """
358
377
359 class LSFEngineSet(PBSEngineSet):
378 class LSFEngineSet(PBSEngineSet):
360
379
361 name = 'LSF'
380 name = 'LSF'
362 submit_command = 'bsub'
381 submit_command = 'bsub'
363 delete_command = 'bkill'
382 delete_command = 'bkill'
364 script_param_prefix = "#BSUB"
383 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
365 job_array_regexp = '#BSUB[ \t]+\w+\[\d+-\d+\]'
384 job_array_template = '#BSUB -J ipengine[1-%d]'
366 default_template="""#BSUB ipengine[1-%d]
385 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
386 queue_template = '#BSUB -q %s'
387 default_template="""#BSUB -J ipengine[1-%d]
367 eid=$(($LSB_JOBINDEX - 1))
388 eid=$(($LSB_JOBINDEX - 1))
368 ipengine --logfile=ipengine${eid}.log
389 ipengine --logfile=ipengine${eid}.log
369 """
390 """
370
391
371 sshx_template="""#!/bin/sh
392 sshx_template="""#!/bin/sh
372 "$@" &> /dev/null &
393 "$@" &> /dev/null &
373 echo $!
394 echo $!
374 """
395 """
375
396
376 engine_killer_template="""#!/bin/sh
397 engine_killer_template="""#!/bin/sh
377 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
398 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
378 """
399 """
379
400
380 class SSHEngineSet(object):
401 class SSHEngineSet(object):
381 sshx_template=sshx_template
402 sshx_template=sshx_template
382 engine_killer_template=engine_killer_template
403 engine_killer_template=engine_killer_template
383
404
384 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
405 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
385 """Start a controller on localhost and engines using ssh.
406 """Start a controller on localhost and engines using ssh.
386
407
387 The engine_hosts argument is a dict with hostnames as keys and
408 The engine_hosts argument is a dict with hostnames as keys and
388 the number of engine (int) as values. sshx is the name of a local
409 the number of engine (int) as values. sshx is the name of a local
389 file that will be used to run remote commands. This file is used
410 file that will be used to run remote commands. This file is used
390 to setup the environment properly.
411 to setup the environment properly.
391 """
412 """
392
413
393 self.temp_dir = tempfile.gettempdir()
414 self.temp_dir = tempfile.gettempdir()
394 if sshx is not None:
415 if sshx is not None:
395 self.sshx = sshx
416 self.sshx = sshx
396 else:
417 else:
397 # Write the sshx.sh file locally from our template.
418 # Write the sshx.sh file locally from our template.
398 self.sshx = os.path.join(
419 self.sshx = os.path.join(
399 self.temp_dir,
420 self.temp_dir,
400 '%s-main-sshx.sh' % os.environ['USER']
421 '%s-main-sshx.sh' % os.environ['USER']
401 )
422 )
402 f = open(self.sshx, 'w')
423 f = open(self.sshx, 'w')
403 f.writelines(self.sshx_template)
424 f.writelines(self.sshx_template)
404 f.close()
425 f.close()
405 self.engine_command = ipengine
426 self.engine_command = ipengine
406 self.engine_hosts = engine_hosts
427 self.engine_hosts = engine_hosts
407 # Write the engine killer script file locally from our template.
428 # Write the engine killer script file locally from our template.
408 self.engine_killer = os.path.join(
429 self.engine_killer = os.path.join(
409 self.temp_dir,
430 self.temp_dir,
410 '%s-local-engine_killer.sh' % os.environ['USER']
431 '%s-local-engine_killer.sh' % os.environ['USER']
411 )
432 )
412 f = open(self.engine_killer, 'w')
433 f = open(self.engine_killer, 'w')
413 f.writelines(self.engine_killer_template)
434 f.writelines(self.engine_killer_template)
414 f.close()
435 f.close()
415
436
416 def start(self, send_furl=False):
437 def start(self, send_furl=False):
417 dlist = []
438 dlist = []
418 for host in self.engine_hosts.keys():
439 for host in self.engine_hosts.keys():
419 count = self.engine_hosts[host]
440 count = self.engine_hosts[host]
420 d = self._start(host, count, send_furl)
441 d = self._start(host, count, send_furl)
421 dlist.append(d)
442 dlist.append(d)
422 return gatherBoth(dlist, consumeErrors=True)
443 return gatherBoth(dlist, consumeErrors=True)
423
444
424 def _start(self, hostname, count=1, send_furl=False):
445 def _start(self, hostname, count=1, send_furl=False):
425 if send_furl:
446 if send_furl:
426 d = self._scp_furl(hostname)
447 d = self._scp_furl(hostname)
427 else:
448 else:
428 d = defer.succeed(None)
449 d = defer.succeed(None)
429 d.addCallback(lambda r: self._scp_sshx(hostname))
450 d.addCallback(lambda r: self._scp_sshx(hostname))
430 d.addCallback(lambda r: self._ssh_engine(hostname, count))
451 d.addCallback(lambda r: self._ssh_engine(hostname, count))
431 return d
452 return d
432
453
433 def _scp_furl(self, hostname):
454 def _scp_furl(self, hostname):
434 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
455 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
435 cmd_list = scp_cmd.split()
456 cmd_list = scp_cmd.split()
436 cmd_list[1] = os.path.expanduser(cmd_list[1])
457 cmd_list[1] = os.path.expanduser(cmd_list[1])
437 log.msg('Copying furl file: %s' % scp_cmd)
458 log.msg('Copying furl file: %s' % scp_cmd)
438 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
459 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
439 return d
460 return d
440
461
441 def _scp_sshx(self, hostname):
462 def _scp_sshx(self, hostname):
442 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
463 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
443 self.sshx, hostname,
464 self.sshx, hostname,
444 self.temp_dir, os.environ['USER']
465 self.temp_dir, os.environ['USER']
445 )
466 )
446 print
467 print
447 log.msg("Copying sshx: %s" % scp_cmd)
468 log.msg("Copying sshx: %s" % scp_cmd)
448 sshx_scp = scp_cmd.split()
469 sshx_scp = scp_cmd.split()
449 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
470 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
450 return d
471 return d
451
472
452 def _ssh_engine(self, hostname, count):
473 def _ssh_engine(self, hostname, count):
453 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
474 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
454 hostname, self.temp_dir,
475 hostname, self.temp_dir,
455 os.environ['USER'], self.engine_command
476 os.environ['USER'], self.engine_command
456 )
477 )
457 cmds = exec_engine.split()
478 cmds = exec_engine.split()
458 dlist = []
479 dlist = []
459 log.msg("about to start engines...")
480 log.msg("about to start engines...")
460 for i in range(count):
481 for i in range(count):
461 log.msg('Starting engines: %s' % exec_engine)
482 log.msg('Starting engines: %s' % exec_engine)
462 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
483 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
463 dlist.append(d)
484 dlist.append(d)
464 return gatherBoth(dlist, consumeErrors=True)
485 return gatherBoth(dlist, consumeErrors=True)
465
486
466 def kill(self):
487 def kill(self):
467 dlist = []
488 dlist = []
468 for host in self.engine_hosts.keys():
489 for host in self.engine_hosts.keys():
469 d = self._killall(host)
490 d = self._killall(host)
470 dlist.append(d)
491 dlist.append(d)
471 return gatherBoth(dlist, consumeErrors=True)
492 return gatherBoth(dlist, consumeErrors=True)
472
493
473 def _killall(self, hostname):
494 def _killall(self, hostname):
474 d = self._scp_engine_killer(hostname)
495 d = self._scp_engine_killer(hostname)
475 d.addCallback(lambda r: self._ssh_kill(hostname))
496 d.addCallback(lambda r: self._ssh_kill(hostname))
476 # d.addErrback(self._exec_err)
497 # d.addErrback(self._exec_err)
477 return d
498 return d
478
499
479 def _scp_engine_killer(self, hostname):
500 def _scp_engine_killer(self, hostname):
480 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
501 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
481 self.engine_killer,
502 self.engine_killer,
482 hostname,
503 hostname,
483 self.temp_dir,
504 self.temp_dir,
484 os.environ['USER']
505 os.environ['USER']
485 )
506 )
486 cmds = scp_cmd.split()
507 cmds = scp_cmd.split()
487 log.msg('Copying engine_killer: %s' % scp_cmd)
508 log.msg('Copying engine_killer: %s' % scp_cmd)
488 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
509 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
489 return d
510 return d
490
511
491 def _ssh_kill(self, hostname):
512 def _ssh_kill(self, hostname):
492 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
513 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
493 hostname,
514 hostname,
494 self.temp_dir,
515 self.temp_dir,
495 os.environ['USER']
516 os.environ['USER']
496 )
517 )
497 log.msg('Killing engine: %s' % kill_cmd)
518 log.msg('Killing engine: %s' % kill_cmd)
498 kill_cmd = kill_cmd.split()
519 kill_cmd = kill_cmd.split()
499 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
520 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
500 return d
521 return d
501
522
502 def _exec_err(self, r):
523 def _exec_err(self, r):
503 log.msg(r)
524 log.msg(r)
504
525
505 #-----------------------------------------------------------------------------
526 #-----------------------------------------------------------------------------
506 # Main functions for the different types of clusters
527 # Main functions for the different types of clusters
507 #-----------------------------------------------------------------------------
528 #-----------------------------------------------------------------------------
508
529
509 # TODO:
530 # TODO:
510 # The logic in these codes should be moved into classes like LocalCluster
531 # The logic in these codes should be moved into classes like LocalCluster
511 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
532 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
512 # The main functions should then just parse the command line arguments, create
533 # The main functions should then just parse the command line arguments, create
513 # the appropriate class and call a 'start' method.
534 # the appropriate class and call a 'start' method.
514
535
515
536
516 def check_security(args, cont_args):
537 def check_security(args, cont_args):
517 """Check to see if we should run with SSL support."""
538 """Check to see if we should run with SSL support."""
518 if (not args.x or not args.y) and not have_crypto:
539 if (not args.x or not args.y) and not have_crypto:
519 log.err("""
540 log.err("""
520 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
541 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
521 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
542 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
522 reactor.stop()
543 reactor.stop()
523 return False
544 return False
524 if args.x:
545 if args.x:
525 cont_args.append('-x')
546 cont_args.append('-x')
526 if args.y:
547 if args.y:
527 cont_args.append('-y')
548 cont_args.append('-y')
528 return True
549 return True
529
550
530
551
531 def check_reuse(args, cont_args):
552 def check_reuse(args, cont_args):
532 """Check to see if we should try to resuse FURL files."""
553 """Check to see if we should try to resuse FURL files."""
533 if args.r:
554 if args.r:
534 cont_args.append('-r')
555 cont_args.append('-r')
535 if args.client_port == 0 or args.engine_port == 0:
556 if args.client_port == 0 or args.engine_port == 0:
536 log.err("""
557 log.err("""
537 To reuse FURL files, you must also set the client and engine ports using
558 To reuse FURL files, you must also set the client and engine ports using
538 the --client-port and --engine-port options.""")
559 the --client-port and --engine-port options.""")
539 reactor.stop()
560 reactor.stop()
540 return False
561 return False
541 cont_args.append('--client-port=%i' % args.client_port)
562 cont_args.append('--client-port=%i' % args.client_port)
542 cont_args.append('--engine-port=%i' % args.engine_port)
563 cont_args.append('--engine-port=%i' % args.engine_port)
543 return True
564 return True
544
565
545
566
546 def _err_and_stop(f):
567 def _err_and_stop(f):
547 """Errback to log a failure and halt the reactor on a fatal error."""
568 """Errback to log a failure and halt the reactor on a fatal error."""
548 log.err(f)
569 log.err(f)
549 reactor.stop()
570 reactor.stop()
550
571
551
572
552 def _delay_start(cont_pid, start_engines, furl_file, reuse):
573 def _delay_start(cont_pid, start_engines, furl_file, reuse):
553 """Wait for controller to create FURL files and the start the engines."""
574 """Wait for controller to create FURL files and the start the engines."""
554 if not reuse:
575 if not reuse:
555 if os.path.isfile(furl_file):
576 if os.path.isfile(furl_file):
556 os.unlink(furl_file)
577 os.unlink(furl_file)
557 log.msg('Waiting for controller to finish starting...')
578 log.msg('Waiting for controller to finish starting...')
558 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
579 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
559 d.addCallback(lambda _: log.msg('Controller started'))
580 d.addCallback(lambda _: log.msg('Controller started'))
560 d.addCallback(lambda _: start_engines(cont_pid))
581 d.addCallback(lambda _: start_engines(cont_pid))
561 return d
582 return d
562
583
563
584
564 def main_local(args):
585 def main_local(args):
565 cont_args = []
586 cont_args = []
566 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
587 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
567
588
568 # Check security settings before proceeding
589 # Check security settings before proceeding
569 if not check_security(args, cont_args):
590 if not check_security(args, cont_args):
570 return
591 return
571
592
572 # See if we are reusing FURL files
593 # See if we are reusing FURL files
573 if not check_reuse(args, cont_args):
594 if not check_reuse(args, cont_args):
574 return
595 return
575
596
576 cl = ControllerLauncher(extra_args=cont_args)
597 cl = ControllerLauncher(extra_args=cont_args)
577 dstart = cl.start()
598 dstart = cl.start()
578 def start_engines(cont_pid):
599 def start_engines(cont_pid):
579 engine_args = []
600 engine_args = []
580 engine_args.append('--logfile=%s' % \
601 engine_args.append('--logfile=%s' % \
581 pjoin(args.logdir,'ipengine%s-' % cont_pid))
602 pjoin(args.logdir,'ipengine%s-' % cont_pid))
582 eset = LocalEngineSet(extra_args=engine_args)
603 eset = LocalEngineSet(extra_args=engine_args)
583 def shutdown(signum, frame):
604 def shutdown(signum, frame):
584 log.msg('Stopping local cluster')
605 log.msg('Stopping local cluster')
585 # We are still playing with the times here, but these seem
606 # We are still playing with the times here, but these seem
586 # to be reliable in allowing everything to exit cleanly.
607 # to be reliable in allowing everything to exit cleanly.
587 eset.interrupt_then_kill(0.5)
608 eset.interrupt_then_kill(0.5)
588 cl.interrupt_then_kill(0.5)
609 cl.interrupt_then_kill(0.5)
589 reactor.callLater(1.0, reactor.stop)
610 reactor.callLater(1.0, reactor.stop)
590 signal.signal(signal.SIGINT,shutdown)
611 signal.signal(signal.SIGINT,shutdown)
591 d = eset.start(args.n)
612 d = eset.start(args.n)
592 return d
613 return d
593 config = kernel_config_manager.get_config_obj()
614 config = kernel_config_manager.get_config_obj()
594 furl_file = config['controller']['engine_furl_file']
615 furl_file = config['controller']['engine_furl_file']
595 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
616 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
596 dstart.addErrback(_err_and_stop)
617 dstart.addErrback(_err_and_stop)
597
618
598
619
599 def main_mpi(args):
620 def main_mpi(args):
600 cont_args = []
621 cont_args = []
601 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
622 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
602
623
603 # Check security settings before proceeding
624 # Check security settings before proceeding
604 if not check_security(args, cont_args):
625 if not check_security(args, cont_args):
605 return
626 return
606
627
607 # See if we are reusing FURL files
628 # See if we are reusing FURL files
608 if not check_reuse(args, cont_args):
629 if not check_reuse(args, cont_args):
609 return
630 return
610
631
611 cl = ControllerLauncher(extra_args=cont_args)
632 cl = ControllerLauncher(extra_args=cont_args)
612 dstart = cl.start()
633 dstart = cl.start()
613 def start_engines(cont_pid):
634 def start_engines(cont_pid):
614 raw_args = [args.cmd]
635 raw_args = [args.cmd]
615 raw_args.extend(['-n',str(args.n)])
636 raw_args.extend(['-n',str(args.n)])
616 raw_args.append('ipengine')
637 raw_args.append('ipengine')
617 raw_args.append('-l')
638 raw_args.append('-l')
618 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
639 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
619 if args.mpi:
640 if args.mpi:
620 raw_args.append('--mpi=%s' % args.mpi)
641 raw_args.append('--mpi=%s' % args.mpi)
621 eset = ProcessLauncher(raw_args)
642 eset = ProcessLauncher(raw_args)
622 def shutdown(signum, frame):
643 def shutdown(signum, frame):
623 log.msg('Stopping local cluster')
644 log.msg('Stopping local cluster')
624 # We are still playing with the times here, but these seem
645 # We are still playing with the times here, but these seem
625 # to be reliable in allowing everything to exit cleanly.
646 # to be reliable in allowing everything to exit cleanly.
626 eset.interrupt_then_kill(1.0)
647 eset.interrupt_then_kill(1.0)
627 cl.interrupt_then_kill(1.0)
648 cl.interrupt_then_kill(1.0)
628 reactor.callLater(2.0, reactor.stop)
649 reactor.callLater(2.0, reactor.stop)
629 signal.signal(signal.SIGINT,shutdown)
650 signal.signal(signal.SIGINT,shutdown)
630 d = eset.start()
651 d = eset.start()
631 return d
652 return d
632 config = kernel_config_manager.get_config_obj()
653 config = kernel_config_manager.get_config_obj()
633 furl_file = config['controller']['engine_furl_file']
654 furl_file = config['controller']['engine_furl_file']
634 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
655 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
635 dstart.addErrback(_err_and_stop)
656 dstart.addErrback(_err_and_stop)
636
657
637
658
638 def main_pbs(args):
659 def main_pbs(args):
639 cont_args = []
660 cont_args = []
640 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
661 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
641
662
642 # Check security settings before proceeding
663 # Check security settings before proceeding
643 if not check_security(args, cont_args):
664 if not check_security(args, cont_args):
644 return
665 return
645
666
646 # See if we are reusing FURL files
667 # See if we are reusing FURL files
647 if not check_reuse(args, cont_args):
668 if not check_reuse(args, cont_args):
648 return
669 return
649
670
650 if args.pbsscript and not os.path.isfile(args.pbsscript):
671 if args.pbsscript and not os.path.isfile(args.pbsscript):
651 log.err('PBS script does not exist: %s' % args.pbsscript)
672 log.err('PBS script does not exist: %s' % args.pbsscript)
652 return
673 return
653
674
654 cl = ControllerLauncher(extra_args=cont_args)
675 cl = ControllerLauncher(extra_args=cont_args)
655 dstart = cl.start()
676 dstart = cl.start()
656 def start_engines(r):
677 def start_engines(r):
657 pbs_set = PBSEngineSet(args.pbsscript)
678 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
658 def shutdown(signum, frame):
679 def shutdown(signum, frame):
659 log.msg('Stopping PBS cluster')
680 log.msg('Stopping PBS cluster')
660 d = pbs_set.kill()
681 d = pbs_set.kill()
661 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
682 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
662 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
683 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
663 signal.signal(signal.SIGINT,shutdown)
684 signal.signal(signal.SIGINT,shutdown)
664 d = pbs_set.start(args.n)
685 d = pbs_set.start(args.n)
665 return d
686 return d
666 config = kernel_config_manager.get_config_obj()
687 config = kernel_config_manager.get_config_obj()
667 furl_file = config['controller']['engine_furl_file']
688 furl_file = config['controller']['engine_furl_file']
668 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
689 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
669 dstart.addErrback(_err_and_stop)
690 dstart.addErrback(_err_and_stop)
670
691
671 def main_sge(args):
692 def main_sge(args):
672 cont_args = []
693 cont_args = []
673 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
694 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
674
695
675 # Check security settings before proceeding
696 # Check security settings before proceeding
676 if not check_security(args, cont_args):
697 if not check_security(args, cont_args):
677 return
698 return
678
699
679 # See if we are reusing FURL files
700 # See if we are reusing FURL files
680 if not check_reuse(args, cont_args):
701 if not check_reuse(args, cont_args):
681 return
702 return
682
703
683 if args.sgescript and not os.path.isfile(args.sgescript):
704 if args.sgescript and not os.path.isfile(args.sgescript):
684 log.err('SGE script does not exist: %s' % args.sgescript)
705 log.err('SGE script does not exist: %s' % args.sgescript)
685 return
706 return
686
707
687 cl = ControllerLauncher(extra_args=cont_args)
708 cl = ControllerLauncher(extra_args=cont_args)
688 dstart = cl.start()
709 dstart = cl.start()
689 def start_engines(r):
710 def start_engines(r):
690 sge_set = SGEEngineSet(args.sgescript)
711 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
691 def shutdown(signum, frame):
712 def shutdown(signum, frame):
692 log.msg('Stopping sge cluster')
713 log.msg('Stopping sge cluster')
693 d = sge_set.kill()
714 d = sge_set.kill()
694 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
715 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
695 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
716 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
696 signal.signal(signal.SIGINT,shutdown)
717 signal.signal(signal.SIGINT,shutdown)
697 d = sge_set.start(args.n)
718 d = sge_set.start(args.n)
698 return d
719 return d
699 config = kernel_config_manager.get_config_obj()
720 config = kernel_config_manager.get_config_obj()
700 furl_file = config['controller']['engine_furl_file']
721 furl_file = config['controller']['engine_furl_file']
701 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
722 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
702 dstart.addErrback(_err_and_stop)
723 dstart.addErrback(_err_and_stop)
703
724
704 def main_lsf(args):
725 def main_lsf(args):
705 cont_args = []
726 cont_args = []
706 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
727 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
707
728
708 # Check security settings before proceeding
729 # Check security settings before proceeding
709 if not check_security(args, cont_args):
730 if not check_security(args, cont_args):
710 return
731 return
711
732
712 # See if we are reusing FURL files
733 # See if we are reusing FURL files
713 if not check_reuse(args, cont_args):
734 if not check_reuse(args, cont_args):
714 return
735 return
715
736
716 if args.lsfscript and not os.path.isfile(args.lsfscript):
737 if args.lsfscript and not os.path.isfile(args.lsfscript):
717 log.err('LSF script does not exist: %s' % args.lsfscript)
738 log.err('LSF script does not exist: %s' % args.lsfscript)
718 return
739 return
719
740
720 cl = ControllerLauncher(extra_args=cont_args)
741 cl = ControllerLauncher(extra_args=cont_args)
721 dstart = cl.start()
742 dstart = cl.start()
722 def start_engines(r):
743 def start_engines(r):
723 lsf_set = LSFEngineSet(args.lsfscript)
744 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
724 def shutdown(signum, frame):
745 def shutdown(signum, frame):
725 log.msg('Stopping LSF cluster')
746 log.msg('Stopping LSF cluster')
726 d = lsf_set.kill()
747 d = lsf_set.kill()
727 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
748 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
728 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
749 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
729 signal.signal(signal.SIGINT,shutdown)
750 signal.signal(signal.SIGINT,shutdown)
730 d = lsf_set.start(args.n)
751 d = lsf_set.start(args.n)
731 return d
752 return d
732 config = kernel_config_manager.get_config_obj()
753 config = kernel_config_manager.get_config_obj()
733 furl_file = config['controller']['engine_furl_file']
754 furl_file = config['controller']['engine_furl_file']
734 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
755 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
735 dstart.addErrback(_err_and_stop)
756 dstart.addErrback(_err_and_stop)
736
757
737
758
738 def main_ssh(args):
759 def main_ssh(args):
739 """Start a controller on localhost and engines using ssh.
760 """Start a controller on localhost and engines using ssh.
740
761
741 Your clusterfile should look like::
762 Your clusterfile should look like::
742
763
743 send_furl = False # True, if you want
764 send_furl = False # True, if you want
744 engines = {
765 engines = {
745 'engine_host1' : engine_count,
766 'engine_host1' : engine_count,
746 'engine_host2' : engine_count2
767 'engine_host2' : engine_count2
747 }
768 }
748 """
769 """
749 clusterfile = {}
770 clusterfile = {}
750 execfile(args.clusterfile, clusterfile)
771 execfile(args.clusterfile, clusterfile)
751 if not clusterfile.has_key('send_furl'):
772 if not clusterfile.has_key('send_furl'):
752 clusterfile['send_furl'] = False
773 clusterfile['send_furl'] = False
753
774
754 cont_args = []
775 cont_args = []
755 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
776 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
756
777
757 # Check security settings before proceeding
778 # Check security settings before proceeding
758 if not check_security(args, cont_args):
779 if not check_security(args, cont_args):
759 return
780 return
760
781
761 # See if we are reusing FURL files
782 # See if we are reusing FURL files
762 if not check_reuse(args, cont_args):
783 if not check_reuse(args, cont_args):
763 return
784 return
764
785
765 cl = ControllerLauncher(extra_args=cont_args)
786 cl = ControllerLauncher(extra_args=cont_args)
766 dstart = cl.start()
787 dstart = cl.start()
767 def start_engines(cont_pid):
788 def start_engines(cont_pid):
768 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
789 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
769 def shutdown(signum, frame):
790 def shutdown(signum, frame):
770 d = ssh_set.kill()
791 d = ssh_set.kill()
771 cl.interrupt_then_kill(1.0)
792 cl.interrupt_then_kill(1.0)
772 reactor.callLater(2.0, reactor.stop)
793 reactor.callLater(2.0, reactor.stop)
773 signal.signal(signal.SIGINT,shutdown)
794 signal.signal(signal.SIGINT,shutdown)
774 d = ssh_set.start(clusterfile['send_furl'])
795 d = ssh_set.start(clusterfile['send_furl'])
775 return d
796 return d
776 config = kernel_config_manager.get_config_obj()
797 config = kernel_config_manager.get_config_obj()
777 furl_file = config['controller']['engine_furl_file']
798 furl_file = config['controller']['engine_furl_file']
778 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
799 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
779 dstart.addErrback(_err_and_stop)
800 dstart.addErrback(_err_and_stop)
780
801
781
802
782 def get_args():
803 def get_args():
783 base_parser = argparse.ArgumentParser(add_help=False)
804 base_parser = argparse.ArgumentParser(add_help=False)
784 base_parser.add_argument(
805 base_parser.add_argument(
785 '-r',
806 '-r',
786 action='store_true',
807 action='store_true',
787 dest='r',
808 dest='r',
788 help='try to reuse FURL files. Use with --client-port and --engine-port'
809 help='try to reuse FURL files. Use with --client-port and --engine-port'
789 )
810 )
790 base_parser.add_argument(
811 base_parser.add_argument(
791 '--client-port',
812 '--client-port',
792 type=int,
813 type=int,
793 dest='client_port',
814 dest='client_port',
794 help='the port the controller will listen on for client connections',
815 help='the port the controller will listen on for client connections',
795 default=0
816 default=0
796 )
817 )
797 base_parser.add_argument(
818 base_parser.add_argument(
798 '--engine-port',
819 '--engine-port',
799 type=int,
820 type=int,
800 dest='engine_port',
821 dest='engine_port',
801 help='the port the controller will listen on for engine connections',
822 help='the port the controller will listen on for engine connections',
802 default=0
823 default=0
803 )
824 )
804 base_parser.add_argument(
825 base_parser.add_argument(
805 '-x',
826 '-x',
806 action='store_true',
827 action='store_true',
807 dest='x',
828 dest='x',
808 help='turn off client security'
829 help='turn off client security'
809 )
830 )
810 base_parser.add_argument(
831 base_parser.add_argument(
811 '-y',
832 '-y',
812 action='store_true',
833 action='store_true',
813 dest='y',
834 dest='y',
814 help='turn off engine security'
835 help='turn off engine security'
815 )
836 )
816 base_parser.add_argument(
837 base_parser.add_argument(
817 "--logdir",
838 "--logdir",
818 type=str,
839 type=str,
819 dest="logdir",
840 dest="logdir",
820 help="directory to put log files (default=$IPYTHONDIR/log)",
841 help="directory to put log files (default=$IPYTHONDIR/log)",
821 default=pjoin(get_ipython_dir(),'log')
842 default=pjoin(get_ipython_dir(),'log')
822 )
843 )
823 base_parser.add_argument(
844 base_parser.add_argument(
824 "-n",
845 "-n",
825 "--num",
846 "--num",
826 type=int,
847 type=int,
827 dest="n",
848 dest="n",
828 default=2,
849 default=2,
829 help="the number of engines to start"
850 help="the number of engines to start"
830 )
851 )
831
852
832 parser = argparse.ArgumentParser(
853 parser = argparse.ArgumentParser(
833 description='IPython cluster startup. This starts a controller and\
854 description='IPython cluster startup. This starts a controller and\
834 engines using various approaches. Use the IPYTHONDIR environment\
855 engines using various approaches. Use the IPYTHONDIR environment\
835 variable to change your IPython directory from the default of\
856 variable to change your IPython directory from the default of\
836 .ipython or _ipython. The log and security subdirectories of your\
857 .ipython or _ipython. The log and security subdirectories of your\
837 IPython directory will be used by this script for log files and\
858 IPython directory will be used by this script for log files and\
838 security files.'
859 security files.'
839 )
860 )
840 subparsers = parser.add_subparsers(
861 subparsers = parser.add_subparsers(
841 help='available cluster types. For help, do "ipcluster TYPE --help"')
862 help='available cluster types. For help, do "ipcluster TYPE --help"')
842
863
843 parser_local = subparsers.add_parser(
864 parser_local = subparsers.add_parser(
844 'local',
865 'local',
845 help='run a local cluster',
866 help='run a local cluster',
846 parents=[base_parser]
867 parents=[base_parser]
847 )
868 )
848 parser_local.set_defaults(func=main_local)
869 parser_local.set_defaults(func=main_local)
849
870
850 parser_mpirun = subparsers.add_parser(
871 parser_mpirun = subparsers.add_parser(
851 'mpirun',
872 'mpirun',
852 help='run a cluster using mpirun (mpiexec also works)',
873 help='run a cluster using mpirun (mpiexec also works)',
853 parents=[base_parser]
874 parents=[base_parser]
854 )
875 )
855 parser_mpirun.add_argument(
876 parser_mpirun.add_argument(
856 "--mpi",
877 "--mpi",
857 type=str,
878 type=str,
858 dest="mpi", # Don't put a default here to allow no MPI support
879 dest="mpi", # Don't put a default here to allow no MPI support
859 help="how to call MPI_Init (default=mpi4py)"
880 help="how to call MPI_Init (default=mpi4py)"
860 )
881 )
861 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
882 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
862
883
863 parser_mpiexec = subparsers.add_parser(
884 parser_mpiexec = subparsers.add_parser(
864 'mpiexec',
885 'mpiexec',
865 help='run a cluster using mpiexec (mpirun also works)',
886 help='run a cluster using mpiexec (mpirun also works)',
866 parents=[base_parser]
887 parents=[base_parser]
867 )
888 )
868 parser_mpiexec.add_argument(
889 parser_mpiexec.add_argument(
869 "--mpi",
890 "--mpi",
870 type=str,
891 type=str,
871 dest="mpi", # Don't put a default here to allow no MPI support
892 dest="mpi", # Don't put a default here to allow no MPI support
872 help="how to call MPI_Init (default=mpi4py)"
893 help="how to call MPI_Init (default=mpi4py)"
873 )
894 )
874 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
895 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
875
896
876 parser_pbs = subparsers.add_parser(
897 parser_pbs = subparsers.add_parser(
877 'pbs',
898 'pbs',
878 help='run a pbs cluster',
899 help='run a pbs cluster',
879 parents=[base_parser]
900 parents=[base_parser]
880 )
901 )
881 parser_pbs.add_argument(
902 parser_pbs.add_argument(
903 '-s',
882 '--pbs-script',
904 '--pbs-script',
883 type=str,
905 type=str,
884 dest='pbsscript',
906 dest='pbsscript',
885 help='PBS script template',
907 help='PBS script template',
886 default=''
908 default=''
887 )
909 )
910 parser_pbs.add_argument(
911 '-q',
912 '--queue',
913 type=str,
914 dest='pbsqueue',
915 help='PBS queue to use when starting the engines',
916 default=None,
917 )
888 parser_pbs.set_defaults(func=main_pbs)
918 parser_pbs.set_defaults(func=main_pbs)
889
919
890 parser_sge = subparsers.add_parser(
920 parser_sge = subparsers.add_parser(
891 'sge',
921 'sge',
892 help='run an sge cluster',
922 help='run an sge cluster',
893 parents=[base_parser]
923 parents=[base_parser]
894 )
924 )
895 parser_sge.add_argument(
925 parser_sge.add_argument(
926 '-s',
896 '--sge-script',
927 '--sge-script',
897 type=str,
928 type=str,
898 dest='sgescript',
929 dest='sgescript',
899 help='SGE script template',
930 help='SGE script template',
900 default='' # SGEEngineSet will create one if not specified
931 default='' # SGEEngineSet will create one if not specified
901 )
932 )
933 parser_sge.add_argument(
934 '-q',
935 '--queue',
936 type=str,
937 dest='sgequeue',
938 help='SGE queue to use when starting the engines',
939 default=None,
940 )
902 parser_sge.set_defaults(func=main_sge)
941 parser_sge.set_defaults(func=main_sge)
903
942
904 parser_lsf = subparsers.add_parser(
943 parser_lsf = subparsers.add_parser(
905 'lsf',
944 'lsf',
906 help='run an lsf cluster',
945 help='run an lsf cluster',
907 parents=[base_parser]
946 parents=[base_parser]
908 )
947 )
948
909 parser_lsf.add_argument(
949 parser_lsf.add_argument(
950 '-s',
910 '--lsf-script',
951 '--lsf-script',
911 type=str,
952 type=str,
912 dest='lsfscript',
953 dest='lsfscript',
913 help='LSF script template',
954 help='LSF script template',
914 default='' # LSFEngineSet will create one if not specified
955 default='' # LSFEngineSet will create one if not specified
915 )
956 )
957
958 parser_lsf.add_argument(
959 '-q',
960 '--queue',
961 type=str,
962 dest='lsfqueue',
963 help='LSF queue to use when starting the engines',
964 default=None,
965 )
916 parser_lsf.set_defaults(func=main_lsf)
966 parser_lsf.set_defaults(func=main_lsf)
917
967
918 parser_ssh = subparsers.add_parser(
968 parser_ssh = subparsers.add_parser(
919 'ssh',
969 'ssh',
920 help='run a cluster using ssh, should have ssh-keys setup',
970 help='run a cluster using ssh, should have ssh-keys setup',
921 parents=[base_parser]
971 parents=[base_parser]
922 )
972 )
923 parser_ssh.add_argument(
973 parser_ssh.add_argument(
924 '--clusterfile',
974 '--clusterfile',
925 type=str,
975 type=str,
926 dest='clusterfile',
976 dest='clusterfile',
927 help='python file describing the cluster',
977 help='python file describing the cluster',
928 default='clusterfile.py',
978 default='clusterfile.py',
929 )
979 )
930 parser_ssh.add_argument(
980 parser_ssh.add_argument(
931 '--sshx',
981 '--sshx',
932 type=str,
982 type=str,
933 dest='sshx',
983 dest='sshx',
934 help='sshx launcher helper'
984 help='sshx launcher helper'
935 )
985 )
936 parser_ssh.set_defaults(func=main_ssh)
986 parser_ssh.set_defaults(func=main_ssh)
937
987
938 args = parser.parse_args()
988 args = parser.parse_args()
939 return args
989 return args
940
990
941 def main():
991 def main():
942 args = get_args()
992 args = get_args()
943 reactor.callWhenRunning(args.func, args)
993 reactor.callWhenRunning(args.func, args)
944 log.startLogging(sys.stdout)
994 log.startLogging(sys.stdout)
945 reactor.run()
995 reactor.run()
946
996
947 if __name__ == '__main__':
997 if __name__ == '__main__':
948 main()
998 main()
General Comments 0
You need to be logged in to leave comments. Login now