##// END OF EJS Templates
add experimental LSF support (needs testing)
Justin Riley -
Show More
@@ -1,885 +1,911 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 import tempfile
22 pjoin = os.path.join
22 pjoin = os.path.join
23
23
24 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import failure, log
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.external import Itpl
31 from IPython.external import Itpl
32 from IPython.genutils import (
32 from IPython.genutils import (
33 get_ipython_dir,
33 get_ipython_dir,
34 get_log_dir,
34 get_log_dir,
35 get_security_dir,
35 get_security_dir,
36 num_cpus
36 num_cpus
37 )
37 )
38 from IPython.kernel.fcutil import have_crypto
38 from IPython.kernel.fcutil import have_crypto
39
39
40 # Create various ipython directories if they don't exist.
40 # Create various ipython directories if they don't exist.
41 # This must be done before IPython.kernel.config is imported.
41 # This must be done before IPython.kernel.config is imported.
42 from IPython.iplib import user_setup
42 from IPython.iplib import user_setup
43 if os.name == 'posix':
43 if os.name == 'posix':
44 rc_suffix = ''
44 rc_suffix = ''
45 else:
45 else:
46 rc_suffix = '.ini'
46 rc_suffix = '.ini'
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 get_log_dir()
48 get_log_dir()
49 get_security_dir()
49 get_security_dir()
50
50
51 from IPython.kernel.config import config_manager as kernel_config_manager
51 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.error import SecurityError, FileTimeoutError
52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.fcutil import have_crypto
53 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.util import printer
55 from IPython.kernel.util import printer
56
56
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58 # General process handling code
58 # General process handling code
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60
60
61
61
62 class ProcessStateError(Exception):
62 class ProcessStateError(Exception):
63 pass
63 pass
64
64
65 class UnknownStatus(Exception):
65 class UnknownStatus(Exception):
66 pass
66 pass
67
67
68 class LauncherProcessProtocol(ProcessProtocol):
68 class LauncherProcessProtocol(ProcessProtocol):
69 """
69 """
70 A ProcessProtocol to go with the ProcessLauncher.
70 A ProcessProtocol to go with the ProcessLauncher.
71 """
71 """
72 def __init__(self, process_launcher):
72 def __init__(self, process_launcher):
73 self.process_launcher = process_launcher
73 self.process_launcher = process_launcher
74
74
75 def connectionMade(self):
75 def connectionMade(self):
76 self.process_launcher.fire_start_deferred(self.transport.pid)
76 self.process_launcher.fire_start_deferred(self.transport.pid)
77
77
78 def processEnded(self, status):
78 def processEnded(self, status):
79 value = status.value
79 value = status.value
80 if isinstance(value, ProcessDone):
80 if isinstance(value, ProcessDone):
81 self.process_launcher.fire_stop_deferred(0)
81 self.process_launcher.fire_stop_deferred(0)
82 elif isinstance(value, ProcessTerminated):
82 elif isinstance(value, ProcessTerminated):
83 self.process_launcher.fire_stop_deferred(
83 self.process_launcher.fire_stop_deferred(
84 {'exit_code':value.exitCode,
84 {'exit_code':value.exitCode,
85 'signal':value.signal,
85 'signal':value.signal,
86 'status':value.status
86 'status':value.status
87 }
87 }
88 )
88 )
89 else:
89 else:
90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91
91
92 def outReceived(self, data):
92 def outReceived(self, data):
93 log.msg(data)
93 log.msg(data)
94
94
95 def errReceived(self, data):
95 def errReceived(self, data):
96 log.err(data)
96 log.err(data)
97
97
98 class ProcessLauncher(object):
98 class ProcessLauncher(object):
99 """
99 """
100 Start and stop an external process in an asynchronous manner.
100 Start and stop an external process in an asynchronous manner.
101
101
102 Currently this uses deferreds to notify other parties of process state
102 Currently this uses deferreds to notify other parties of process state
103 changes. This is an awkward design and should be moved to using
103 changes. This is an awkward design and should be moved to using
104 a formal NotificationCenter.
104 a formal NotificationCenter.
105 """
105 """
106 def __init__(self, cmd_and_args):
106 def __init__(self, cmd_and_args):
107 self.cmd = cmd_and_args[0]
107 self.cmd = cmd_and_args[0]
108 self.args = cmd_and_args
108 self.args = cmd_and_args
109 self._reset()
109 self._reset()
110
110
111 def _reset(self):
111 def _reset(self):
112 self.process_protocol = None
112 self.process_protocol = None
113 self.pid = None
113 self.pid = None
114 self.start_deferred = None
114 self.start_deferred = None
115 self.stop_deferreds = []
115 self.stop_deferreds = []
116 self.state = 'before' # before, running, or after
116 self.state = 'before' # before, running, or after
117
117
118 @property
118 @property
119 def running(self):
119 def running(self):
120 if self.state == 'running':
120 if self.state == 'running':
121 return True
121 return True
122 else:
122 else:
123 return False
123 return False
124
124
125 def fire_start_deferred(self, pid):
125 def fire_start_deferred(self, pid):
126 self.pid = pid
126 self.pid = pid
127 self.state = 'running'
127 self.state = 'running'
128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 self.start_deferred.callback(pid)
129 self.start_deferred.callback(pid)
130
130
131 def start(self):
131 def start(self):
132 if self.state == 'before':
132 if self.state == 'before':
133 self.process_protocol = LauncherProcessProtocol(self)
133 self.process_protocol = LauncherProcessProtocol(self)
134 self.start_deferred = defer.Deferred()
134 self.start_deferred = defer.Deferred()
135 self.process_transport = reactor.spawnProcess(
135 self.process_transport = reactor.spawnProcess(
136 self.process_protocol,
136 self.process_protocol,
137 self.cmd,
137 self.cmd,
138 self.args,
138 self.args,
139 env=os.environ
139 env=os.environ
140 )
140 )
141 return self.start_deferred
141 return self.start_deferred
142 else:
142 else:
143 s = 'the process has already been started and has state: %r' % \
143 s = 'the process has already been started and has state: %r' % \
144 self.state
144 self.state
145 return defer.fail(ProcessStateError(s))
145 return defer.fail(ProcessStateError(s))
146
146
147 def get_stop_deferred(self):
147 def get_stop_deferred(self):
148 if self.state == 'running' or self.state == 'before':
148 if self.state == 'running' or self.state == 'before':
149 d = defer.Deferred()
149 d = defer.Deferred()
150 self.stop_deferreds.append(d)
150 self.stop_deferreds.append(d)
151 return d
151 return d
152 else:
152 else:
153 s = 'this process is already complete'
153 s = 'this process is already complete'
154 return defer.fail(ProcessStateError(s))
154 return defer.fail(ProcessStateError(s))
155
155
156 def fire_stop_deferred(self, exit_code):
156 def fire_stop_deferred(self, exit_code):
157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 self.state = 'after'
158 self.state = 'after'
159 for d in self.stop_deferreds:
159 for d in self.stop_deferreds:
160 d.callback(exit_code)
160 d.callback(exit_code)
161
161
162 def signal(self, sig):
162 def signal(self, sig):
163 """
163 """
164 Send a signal to the process.
164 Send a signal to the process.
165
165
166 The argument sig can be ('KILL','INT', etc.) or any signal number.
166 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 """
167 """
168 if self.state == 'running':
168 if self.state == 'running':
169 self.process_transport.signalProcess(sig)
169 self.process_transport.signalProcess(sig)
170
170
171 # def __del__(self):
171 # def __del__(self):
172 # self.signal('KILL')
172 # self.signal('KILL')
173
173
174 def interrupt_then_kill(self, delay=1.0):
174 def interrupt_then_kill(self, delay=1.0):
175 self.signal('INT')
175 self.signal('INT')
176 reactor.callLater(delay, self.signal, 'KILL')
176 reactor.callLater(delay, self.signal, 'KILL')
177
177
178
178
179 #-----------------------------------------------------------------------------
179 #-----------------------------------------------------------------------------
180 # Code for launching controller and engines
180 # Code for launching controller and engines
181 #-----------------------------------------------------------------------------
181 #-----------------------------------------------------------------------------
182
182
183
183
184 class ControllerLauncher(ProcessLauncher):
184 class ControllerLauncher(ProcessLauncher):
185
185
186 def __init__(self, extra_args=None):
186 def __init__(self, extra_args=None):
187 if sys.platform == 'win32':
187 if sys.platform == 'win32':
188 # This logic is needed because the ipcontroller script doesn't
188 # This logic is needed because the ipcontroller script doesn't
189 # always get installed in the same way or in the same location.
189 # always get installed in the same way or in the same location.
190 from IPython.kernel.scripts import ipcontroller
190 from IPython.kernel.scripts import ipcontroller
191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 # The -u option here turns on unbuffered output, which is required
192 # The -u option here turns on unbuffered output, which is required
193 # on Win32 to prevent wierd conflict and problems with Twisted.
193 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # Also, use sys.executable to make sure we are picking up the
194 # Also, use sys.executable to make sure we are picking up the
195 # right python exe.
195 # right python exe.
196 args = [sys.executable, '-u', script_location]
196 args = [sys.executable, '-u', script_location]
197 else:
197 else:
198 args = ['ipcontroller']
198 args = ['ipcontroller']
199 self.extra_args = extra_args
199 self.extra_args = extra_args
200 if extra_args is not None:
200 if extra_args is not None:
201 args.extend(extra_args)
201 args.extend(extra_args)
202
202
203 ProcessLauncher.__init__(self, args)
203 ProcessLauncher.__init__(self, args)
204
204
205
205
206 class EngineLauncher(ProcessLauncher):
206 class EngineLauncher(ProcessLauncher):
207
207
208 def __init__(self, extra_args=None):
208 def __init__(self, extra_args=None):
209 if sys.platform == 'win32':
209 if sys.platform == 'win32':
210 # This logic is needed because the ipcontroller script doesn't
210 # This logic is needed because the ipcontroller script doesn't
211 # always get installed in the same way or in the same location.
211 # always get installed in the same way or in the same location.
212 from IPython.kernel.scripts import ipengine
212 from IPython.kernel.scripts import ipengine
213 script_location = ipengine.__file__.replace('.pyc', '.py')
213 script_location = ipengine.__file__.replace('.pyc', '.py')
214 # The -u option here turns on unbuffered output, which is required
214 # The -u option here turns on unbuffered output, which is required
215 # on Win32 to prevent wierd conflict and problems with Twisted.
215 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # Also, use sys.executable to make sure we are picking up the
216 # Also, use sys.executable to make sure we are picking up the
217 # right python exe.
217 # right python exe.
218 args = [sys.executable, '-u', script_location]
218 args = [sys.executable, '-u', script_location]
219 else:
219 else:
220 args = ['ipengine']
220 args = ['ipengine']
221 self.extra_args = extra_args
221 self.extra_args = extra_args
222 if extra_args is not None:
222 if extra_args is not None:
223 args.extend(extra_args)
223 args.extend(extra_args)
224
224
225 ProcessLauncher.__init__(self, args)
225 ProcessLauncher.__init__(self, args)
226
226
227
227
228 class LocalEngineSet(object):
228 class LocalEngineSet(object):
229
229
230 def __init__(self, extra_args=None):
230 def __init__(self, extra_args=None):
231 self.extra_args = extra_args
231 self.extra_args = extra_args
232 self.launchers = []
232 self.launchers = []
233
233
234 def start(self, n):
234 def start(self, n):
235 dlist = []
235 dlist = []
236 for i in range(n):
236 for i in range(n):
237 print "starting engine:", i
237 print "starting engine:", i
238 el = EngineLauncher(extra_args=self.extra_args)
238 el = EngineLauncher(extra_args=self.extra_args)
239 d = el.start()
239 d = el.start()
240 self.launchers.append(el)
240 self.launchers.append(el)
241 dlist.append(d)
241 dlist.append(d)
242 dfinal = gatherBoth(dlist, consumeErrors=True)
242 dfinal = gatherBoth(dlist, consumeErrors=True)
243 dfinal.addCallback(self._handle_start)
243 dfinal.addCallback(self._handle_start)
244 return dfinal
244 return dfinal
245
245
246 def _handle_start(self, r):
246 def _handle_start(self, r):
247 log.msg('Engines started with pids: %r' % r)
247 log.msg('Engines started with pids: %r' % r)
248 return r
248 return r
249
249
250 def _handle_stop(self, r):
250 def _handle_stop(self, r):
251 log.msg('Engines received signal: %r' % r)
251 log.msg('Engines received signal: %r' % r)
252 return r
252 return r
253
253
254 def signal(self, sig):
254 def signal(self, sig):
255 dlist = []
255 dlist = []
256 for el in self.launchers:
256 for el in self.launchers:
257 d = el.get_stop_deferred()
257 d = el.get_stop_deferred()
258 dlist.append(d)
258 dlist.append(d)
259 el.signal(sig)
259 el.signal(sig)
260 dfinal = gatherBoth(dlist, consumeErrors=True)
260 dfinal = gatherBoth(dlist, consumeErrors=True)
261 dfinal.addCallback(self._handle_stop)
261 dfinal.addCallback(self._handle_stop)
262 return dfinal
262 return dfinal
263
263
264 def interrupt_then_kill(self, delay=1.0):
264 def interrupt_then_kill(self, delay=1.0):
265 dlist = []
265 dlist = []
266 for el in self.launchers:
266 for el in self.launchers:
267 d = el.get_stop_deferred()
267 d = el.get_stop_deferred()
268 dlist.append(d)
268 dlist.append(d)
269 el.interrupt_then_kill(delay)
269 el.interrupt_then_kill(delay)
270 dfinal = gatherBoth(dlist, consumeErrors=True)
270 dfinal = gatherBoth(dlist, consumeErrors=True)
271 dfinal.addCallback(self._handle_stop)
271 dfinal.addCallback(self._handle_stop)
272 return dfinal
272 return dfinal
273
273
274 class BatchEngineSet(object):
274 class BatchEngineSet(object):
275
275
276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 name = ''
277 name = ''
278 submit_command = ''
278 submit_command = ''
279 delete_command = ''
279 delete_command = ''
280 script_param_prefix = ''
280 script_param_prefix = ''
281 job_id_regexp = ''
281 job_id_regexp = ''
282 job_array_regexp = ''
282 job_array_regexp = ''
283 default_template = ''
283 default_template = ''
284
284
285 def __init__(self, template_file, **kwargs):
285 def __init__(self, template_file, **kwargs):
286 self.template_file = template_file
286 self.template_file = template_file
287
287
288 def parse_job_id(self, output):
288 def parse_job_id(self, output):
289 m = re.search(self.job_id_regexp, output)
289 m = re.search(self.job_id_regexp, output)
290 if m is not None:
290 if m is not None:
291 job_id = m.group()
291 job_id = m.group()
292 else:
292 else:
293 raise Exception("job id couldn't be determined: %s" % output)
293 raise Exception("job id couldn't be determined: %s" % output)
294 self.job_id = job_id
294 self.job_id = job_id
295 log.msg('Job started with job id: %r' % job_id)
295 log.msg('Job started with job id: %r' % job_id)
296 return job_id
296 return job_id
297
297
298 def handle_error(self, f):
298 def handle_error(self, f):
299 f.printTraceback()
299 f.printTraceback()
300 f.raiseException()
300 f.raiseException()
301
301
302 def start(self, n):
302 def start(self, n):
303 log.msg("starting %d engines" % n)
303 log.msg("starting %d engines" % n)
304 self._temp_file = tempfile.NamedTemporaryFile()
304 self._temp_file = tempfile.NamedTemporaryFile()
305 regex = re.compile(self.job_array_regexp)
305 regex = re.compile(self.job_array_regexp)
306 if self.template_file:
306 if self.template_file:
307 log.msg("Using %s script %s" % (self.name, self.template_file))
307 log.msg("Using %s script %s" % (self.name, self.template_file))
308 contents = open(self.template_file, 'r').read()
308 contents = open(self.template_file, 'r').read()
309 if not regex.search(contents):
309 if not regex.search(contents):
310 log.msg("adding job array settings to %s script" % self.name)
310 log.msg("adding job array settings to %s script" % self.name)
311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
312 self._temp_file.write(contents)
312 self._temp_file.write(contents)
313 self.template_file = self._temp_file.name
313 self.template_file = self._temp_file.name
314 else:
314 else:
315 log.msg("using default ipengine %s script: \n%s" %
315 log.msg("using default ipengine %s script: \n%s" %
316 (self.name, (self.default_template % n)))
316 (self.name, (self.default_template % n)))
317 self._temp_file.file.write(self.default_template % n)
317 self._temp_file.file.write(self.default_template % n)
318 self.template_file = self._temp_file.name
318 self.template_file = self._temp_file.name
319 self._temp_file.file.flush()
319 self._temp_file.file.flush()
320 d = getProcessOutput(self.submit_command,
320 d = getProcessOutput(self.submit_command,
321 [self.template_file],
321 [self.template_file],
322 env=os.environ)
322 env=os.environ)
323 d.addCallback(self.parse_job_id)
323 d.addCallback(self.parse_job_id)
324 d.addErrback(self.handle_error)
324 d.addErrback(self.handle_error)
325 return d
325 return d
326
326
327 def kill(self):
327 def kill(self):
328 d = getProcessOutput(self.delete_command,
328 d = getProcessOutput(self.delete_command,
329 [self.job_id],env=os.environ)
329 [self.job_id],env=os.environ)
330 return d
330 return d
331
331
332 class PBSEngineSet(BatchEngineSet):
332 class PBSEngineSet(BatchEngineSet):
333
333
334 name = 'PBS'
334 name = 'PBS'
335 submit_command = 'qsub'
335 submit_command = 'qsub'
336 delete_command = 'qdel'
336 delete_command = 'qdel'
337 script_param_prefix = "#PBS"
337 script_param_prefix = "#PBS"
338 job_id_regexp = '\d+'
338 job_id_regexp = '\d+'
339 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
339 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
340 default_template="""#PBS -V
340 default_template="""#PBS -V
341 #PBS -t 1-%d
341 #PBS -t 1-%d
342 #PBS -N ipengine
342 #PBS -N ipengine
343 eid=$(($PBS_ARRAYID - 1))
343 eid=$(($PBS_ARRAYID - 1))
344 ipengine --logfile=ipengine${eid}.log
344 ipengine --logfile=ipengine${eid}.log
345 """
345 """
346
346
347 class SGEEngineSet(PBSEngineSet):
347 class SGEEngineSet(PBSEngineSet):
348
348
349 name = 'SGE'
349 name = 'SGE'
350 script_param_prefix = "#$"
350 script_param_prefix = "#$"
351 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
351 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
352 default_template="""#$ -V
352 default_template="""#$ -V
353 #$ -t 1-%d
353 #$ -t 1-%d
354 #$ -N ipengine
354 #$ -N ipengine
355 eid=$(($SGE_TASK_ID - 1))
355 eid=$(($SGE_TASK_ID - 1))
356 ipengine --logfile=ipengine${eid}.log
356 ipengine --logfile=ipengine${eid}.log
357 """
357 """
358
358
359 class LSFEngineSet(PBSEngineSet):
360
361 name = 'LSF'
362 submit_command = 'bsub'
363 delete_command = 'bkill'
364 script_param_prefix = "#BSUB"
365 job_array_regexp = '#BSUB[ \t]+\w+\[\d+-\d+\]'
366 default_template="""#BSUB ipengine[1-%d]
367 eid=$(($LSB_JOBINDEX - 1))
368 ipengine --logfile=ipengine${eid}.log
369 """
370
359 sshx_template="""#!/bin/sh
371 sshx_template="""#!/bin/sh
360 "$@" &> /dev/null &
372 "$@" &> /dev/null &
361 echo $!
373 echo $!
362 """
374 """
363
375
364 engine_killer_template="""#!/bin/sh
376 engine_killer_template="""#!/bin/sh
365 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
377 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
366 """
378 """
367
379
368 class SSHEngineSet(object):
380 class SSHEngineSet(object):
369 sshx_template=sshx_template
381 sshx_template=sshx_template
370 engine_killer_template=engine_killer_template
382 engine_killer_template=engine_killer_template
371
383
372 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
384 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
373 """Start a controller on localhost and engines using ssh.
385 """Start a controller on localhost and engines using ssh.
374
386
375 The engine_hosts argument is a dict with hostnames as keys and
387 The engine_hosts argument is a dict with hostnames as keys and
376 the number of engine (int) as values. sshx is the name of a local
388 the number of engine (int) as values. sshx is the name of a local
377 file that will be used to run remote commands. This file is used
389 file that will be used to run remote commands. This file is used
378 to setup the environment properly.
390 to setup the environment properly.
379 """
391 """
380
392
381 self.temp_dir = tempfile.gettempdir()
393 self.temp_dir = tempfile.gettempdir()
382 if sshx is not None:
394 if sshx is not None:
383 self.sshx = sshx
395 self.sshx = sshx
384 else:
396 else:
385 # Write the sshx.sh file locally from our template.
397 # Write the sshx.sh file locally from our template.
386 self.sshx = os.path.join(
398 self.sshx = os.path.join(
387 self.temp_dir,
399 self.temp_dir,
388 '%s-main-sshx.sh' % os.environ['USER']
400 '%s-main-sshx.sh' % os.environ['USER']
389 )
401 )
390 f = open(self.sshx, 'w')
402 f = open(self.sshx, 'w')
391 f.writelines(self.sshx_template)
403 f.writelines(self.sshx_template)
392 f.close()
404 f.close()
393 self.engine_command = ipengine
405 self.engine_command = ipengine
394 self.engine_hosts = engine_hosts
406 self.engine_hosts = engine_hosts
395 # Write the engine killer script file locally from our template.
407 # Write the engine killer script file locally from our template.
396 self.engine_killer = os.path.join(
408 self.engine_killer = os.path.join(
397 self.temp_dir,
409 self.temp_dir,
398 '%s-local-engine_killer.sh' % os.environ['USER']
410 '%s-local-engine_killer.sh' % os.environ['USER']
399 )
411 )
400 f = open(self.engine_killer, 'w')
412 f = open(self.engine_killer, 'w')
401 f.writelines(self.engine_killer_template)
413 f.writelines(self.engine_killer_template)
402 f.close()
414 f.close()
403
415
404 def start(self, send_furl=False):
416 def start(self, send_furl=False):
405 dlist = []
417 dlist = []
406 for host in self.engine_hosts.keys():
418 for host in self.engine_hosts.keys():
407 count = self.engine_hosts[host]
419 count = self.engine_hosts[host]
408 d = self._start(host, count, send_furl)
420 d = self._start(host, count, send_furl)
409 dlist.append(d)
421 dlist.append(d)
410 return gatherBoth(dlist, consumeErrors=True)
422 return gatherBoth(dlist, consumeErrors=True)
411
423
412 def _start(self, hostname, count=1, send_furl=False):
424 def _start(self, hostname, count=1, send_furl=False):
413 if send_furl:
425 if send_furl:
414 d = self._scp_furl(hostname)
426 d = self._scp_furl(hostname)
415 else:
427 else:
416 d = defer.succeed(None)
428 d = defer.succeed(None)
417 d.addCallback(lambda r: self._scp_sshx(hostname))
429 d.addCallback(lambda r: self._scp_sshx(hostname))
418 d.addCallback(lambda r: self._ssh_engine(hostname, count))
430 d.addCallback(lambda r: self._ssh_engine(hostname, count))
419 return d
431 return d
420
432
421 def _scp_furl(self, hostname):
433 def _scp_furl(self, hostname):
422 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
434 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
423 cmd_list = scp_cmd.split()
435 cmd_list = scp_cmd.split()
424 cmd_list[1] = os.path.expanduser(cmd_list[1])
436 cmd_list[1] = os.path.expanduser(cmd_list[1])
425 log.msg('Copying furl file: %s' % scp_cmd)
437 log.msg('Copying furl file: %s' % scp_cmd)
426 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
438 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
427 return d
439 return d
428
440
429 def _scp_sshx(self, hostname):
441 def _scp_sshx(self, hostname):
430 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
442 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
431 self.sshx, hostname,
443 self.sshx, hostname,
432 self.temp_dir, os.environ['USER']
444 self.temp_dir, os.environ['USER']
433 )
445 )
434 print
446 print
435 log.msg("Copying sshx: %s" % scp_cmd)
447 log.msg("Copying sshx: %s" % scp_cmd)
436 sshx_scp = scp_cmd.split()
448 sshx_scp = scp_cmd.split()
437 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
449 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
438 return d
450 return d
439
451
440 def _ssh_engine(self, hostname, count):
452 def _ssh_engine(self, hostname, count):
441 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
453 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
442 hostname, self.temp_dir,
454 hostname, self.temp_dir,
443 os.environ['USER'], self.engine_command
455 os.environ['USER'], self.engine_command
444 )
456 )
445 cmds = exec_engine.split()
457 cmds = exec_engine.split()
446 dlist = []
458 dlist = []
447 log.msg("about to start engines...")
459 log.msg("about to start engines...")
448 for i in range(count):
460 for i in range(count):
449 log.msg('Starting engines: %s' % exec_engine)
461 log.msg('Starting engines: %s' % exec_engine)
450 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
462 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
451 dlist.append(d)
463 dlist.append(d)
452 return gatherBoth(dlist, consumeErrors=True)
464 return gatherBoth(dlist, consumeErrors=True)
453
465
454 def kill(self):
466 def kill(self):
455 dlist = []
467 dlist = []
456 for host in self.engine_hosts.keys():
468 for host in self.engine_hosts.keys():
457 d = self._killall(host)
469 d = self._killall(host)
458 dlist.append(d)
470 dlist.append(d)
459 return gatherBoth(dlist, consumeErrors=True)
471 return gatherBoth(dlist, consumeErrors=True)
460
472
461 def _killall(self, hostname):
473 def _killall(self, hostname):
462 d = self._scp_engine_killer(hostname)
474 d = self._scp_engine_killer(hostname)
463 d.addCallback(lambda r: self._ssh_kill(hostname))
475 d.addCallback(lambda r: self._ssh_kill(hostname))
464 # d.addErrback(self._exec_err)
476 # d.addErrback(self._exec_err)
465 return d
477 return d
466
478
467 def _scp_engine_killer(self, hostname):
479 def _scp_engine_killer(self, hostname):
468 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
480 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
469 self.engine_killer,
481 self.engine_killer,
470 hostname,
482 hostname,
471 self.temp_dir,
483 self.temp_dir,
472 os.environ['USER']
484 os.environ['USER']
473 )
485 )
474 cmds = scp_cmd.split()
486 cmds = scp_cmd.split()
475 log.msg('Copying engine_killer: %s' % scp_cmd)
487 log.msg('Copying engine_killer: %s' % scp_cmd)
476 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
488 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
477 return d
489 return d
478
490
479 def _ssh_kill(self, hostname):
491 def _ssh_kill(self, hostname):
480 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
492 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
481 hostname,
493 hostname,
482 self.temp_dir,
494 self.temp_dir,
483 os.environ['USER']
495 os.environ['USER']
484 )
496 )
485 log.msg('Killing engine: %s' % kill_cmd)
497 log.msg('Killing engine: %s' % kill_cmd)
486 kill_cmd = kill_cmd.split()
498 kill_cmd = kill_cmd.split()
487 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
499 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
488 return d
500 return d
489
501
490 def _exec_err(self, r):
502 def _exec_err(self, r):
491 log.msg(r)
503 log.msg(r)
492
504
493 #-----------------------------------------------------------------------------
505 #-----------------------------------------------------------------------------
494 # Main functions for the different types of clusters
506 # Main functions for the different types of clusters
495 #-----------------------------------------------------------------------------
507 #-----------------------------------------------------------------------------
496
508
497 # TODO:
509 # TODO:
498 # The logic in these codes should be moved into classes like LocalCluster
510 # The logic in these codes should be moved into classes like LocalCluster
499 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
511 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
500 # The main functions should then just parse the command line arguments, create
512 # The main functions should then just parse the command line arguments, create
501 # the appropriate class and call a 'start' method.
513 # the appropriate class and call a 'start' method.
502
514
503
515
504 def check_security(args, cont_args):
516 def check_security(args, cont_args):
505 """Check to see if we should run with SSL support."""
517 """Check to see if we should run with SSL support."""
506 if (not args.x or not args.y) and not have_crypto:
518 if (not args.x or not args.y) and not have_crypto:
507 log.err("""
519 log.err("""
508 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
520 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
509 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
521 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
510 reactor.stop()
522 reactor.stop()
511 return False
523 return False
512 if args.x:
524 if args.x:
513 cont_args.append('-x')
525 cont_args.append('-x')
514 if args.y:
526 if args.y:
515 cont_args.append('-y')
527 cont_args.append('-y')
516 return True
528 return True
517
529
518
530
519 def check_reuse(args, cont_args):
531 def check_reuse(args, cont_args):
520 """Check to see if we should try to resuse FURL files."""
532 """Check to see if we should try to resuse FURL files."""
521 if args.r:
533 if args.r:
522 cont_args.append('-r')
534 cont_args.append('-r')
523 if args.client_port == 0 or args.engine_port == 0:
535 if args.client_port == 0 or args.engine_port == 0:
524 log.err("""
536 log.err("""
525 To reuse FURL files, you must also set the client and engine ports using
537 To reuse FURL files, you must also set the client and engine ports using
526 the --client-port and --engine-port options.""")
538 the --client-port and --engine-port options.""")
527 reactor.stop()
539 reactor.stop()
528 return False
540 return False
529 cont_args.append('--client-port=%i' % args.client_port)
541 cont_args.append('--client-port=%i' % args.client_port)
530 cont_args.append('--engine-port=%i' % args.engine_port)
542 cont_args.append('--engine-port=%i' % args.engine_port)
531 return True
543 return True
532
544
533
545
534 def _err_and_stop(f):
546 def _err_and_stop(f):
535 """Errback to log a failure and halt the reactor on a fatal error."""
547 """Errback to log a failure and halt the reactor on a fatal error."""
536 log.err(f)
548 log.err(f)
537 reactor.stop()
549 reactor.stop()
538
550
539
551
540 def _delay_start(cont_pid, start_engines, furl_file, reuse):
552 def _delay_start(cont_pid, start_engines, furl_file, reuse):
541 """Wait for controller to create FURL files and the start the engines."""
553 """Wait for controller to create FURL files and the start the engines."""
542 if not reuse:
554 if not reuse:
543 if os.path.isfile(furl_file):
555 if os.path.isfile(furl_file):
544 os.unlink(furl_file)
556 os.unlink(furl_file)
545 log.msg('Waiting for controller to finish starting...')
557 log.msg('Waiting for controller to finish starting...')
546 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
558 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
547 d.addCallback(lambda _: log.msg('Controller started'))
559 d.addCallback(lambda _: log.msg('Controller started'))
548 d.addCallback(lambda _: start_engines(cont_pid))
560 d.addCallback(lambda _: start_engines(cont_pid))
549 return d
561 return d
550
562
551
563
552 def main_local(args):
564 def main_local(args):
553 cont_args = []
565 cont_args = []
554 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
566 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
555
567
556 # Check security settings before proceeding
568 # Check security settings before proceeding
557 if not check_security(args, cont_args):
569 if not check_security(args, cont_args):
558 return
570 return
559
571
560 # See if we are reusing FURL files
572 # See if we are reusing FURL files
561 if not check_reuse(args, cont_args):
573 if not check_reuse(args, cont_args):
562 return
574 return
563
575
564 cl = ControllerLauncher(extra_args=cont_args)
576 cl = ControllerLauncher(extra_args=cont_args)
565 dstart = cl.start()
577 dstart = cl.start()
566 def start_engines(cont_pid):
578 def start_engines(cont_pid):
567 engine_args = []
579 engine_args = []
568 engine_args.append('--logfile=%s' % \
580 engine_args.append('--logfile=%s' % \
569 pjoin(args.logdir,'ipengine%s-' % cont_pid))
581 pjoin(args.logdir,'ipengine%s-' % cont_pid))
570 eset = LocalEngineSet(extra_args=engine_args)
582 eset = LocalEngineSet(extra_args=engine_args)
571 def shutdown(signum, frame):
583 def shutdown(signum, frame):
572 log.msg('Stopping local cluster')
584 log.msg('Stopping local cluster')
573 # We are still playing with the times here, but these seem
585 # We are still playing with the times here, but these seem
574 # to be reliable in allowing everything to exit cleanly.
586 # to be reliable in allowing everything to exit cleanly.
575 eset.interrupt_then_kill(0.5)
587 eset.interrupt_then_kill(0.5)
576 cl.interrupt_then_kill(0.5)
588 cl.interrupt_then_kill(0.5)
577 reactor.callLater(1.0, reactor.stop)
589 reactor.callLater(1.0, reactor.stop)
578 signal.signal(signal.SIGINT,shutdown)
590 signal.signal(signal.SIGINT,shutdown)
579 d = eset.start(args.n)
591 d = eset.start(args.n)
580 return d
592 return d
581 config = kernel_config_manager.get_config_obj()
593 config = kernel_config_manager.get_config_obj()
582 furl_file = config['controller']['engine_furl_file']
594 furl_file = config['controller']['engine_furl_file']
583 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
595 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
584 dstart.addErrback(_err_and_stop)
596 dstart.addErrback(_err_and_stop)
585
597
586
598
587 def main_mpi(args):
599 def main_mpi(args):
588 cont_args = []
600 cont_args = []
589 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
601 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
590
602
591 # Check security settings before proceeding
603 # Check security settings before proceeding
592 if not check_security(args, cont_args):
604 if not check_security(args, cont_args):
593 return
605 return
594
606
595 # See if we are reusing FURL files
607 # See if we are reusing FURL files
596 if not check_reuse(args, cont_args):
608 if not check_reuse(args, cont_args):
597 return
609 return
598
610
599 cl = ControllerLauncher(extra_args=cont_args)
611 cl = ControllerLauncher(extra_args=cont_args)
600 dstart = cl.start()
612 dstart = cl.start()
601 def start_engines(cont_pid):
613 def start_engines(cont_pid):
602 raw_args = [args.cmd]
614 raw_args = [args.cmd]
603 raw_args.extend(['-n',str(args.n)])
615 raw_args.extend(['-n',str(args.n)])
604 raw_args.append('ipengine')
616 raw_args.append('ipengine')
605 raw_args.append('-l')
617 raw_args.append('-l')
606 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
618 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
607 if args.mpi:
619 if args.mpi:
608 raw_args.append('--mpi=%s' % args.mpi)
620 raw_args.append('--mpi=%s' % args.mpi)
609 eset = ProcessLauncher(raw_args)
621 eset = ProcessLauncher(raw_args)
610 def shutdown(signum, frame):
622 def shutdown(signum, frame):
611 log.msg('Stopping local cluster')
623 log.msg('Stopping local cluster')
612 # We are still playing with the times here, but these seem
624 # We are still playing with the times here, but these seem
613 # to be reliable in allowing everything to exit cleanly.
625 # to be reliable in allowing everything to exit cleanly.
614 eset.interrupt_then_kill(1.0)
626 eset.interrupt_then_kill(1.0)
615 cl.interrupt_then_kill(1.0)
627 cl.interrupt_then_kill(1.0)
616 reactor.callLater(2.0, reactor.stop)
628 reactor.callLater(2.0, reactor.stop)
617 signal.signal(signal.SIGINT,shutdown)
629 signal.signal(signal.SIGINT,shutdown)
618 d = eset.start()
630 d = eset.start()
619 return d
631 return d
620 config = kernel_config_manager.get_config_obj()
632 config = kernel_config_manager.get_config_obj()
621 furl_file = config['controller']['engine_furl_file']
633 furl_file = config['controller']['engine_furl_file']
622 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
634 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
623 dstart.addErrback(_err_and_stop)
635 dstart.addErrback(_err_and_stop)
624
636
625
637
626 def main_pbs(args):
638 def main_pbs(args):
627 cont_args = []
639 cont_args = []
628 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
640 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
629
641
630 # Check security settings before proceeding
642 # Check security settings before proceeding
631 if not check_security(args, cont_args):
643 if not check_security(args, cont_args):
632 return
644 return
633
645
634 # See if we are reusing FURL files
646 # See if we are reusing FURL files
635 if not check_reuse(args, cont_args):
647 if not check_reuse(args, cont_args):
636 return
648 return
637
649
638 cl = ControllerLauncher(extra_args=cont_args)
650 cl = ControllerLauncher(extra_args=cont_args)
639 dstart = cl.start()
651 dstart = cl.start()
640 def start_engines(r):
652 def start_engines(r):
641 pbs_set = PBSEngineSet(args.pbsscript)
653 pbs_set = PBSEngineSet(args.pbsscript)
642 def shutdown(signum, frame):
654 def shutdown(signum, frame):
643 log.msg('Stopping pbs cluster')
655 log.msg('Stopping pbs cluster')
644 d = pbs_set.kill()
656 d = pbs_set.kill()
645 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
657 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
646 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
658 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
647 signal.signal(signal.SIGINT,shutdown)
659 signal.signal(signal.SIGINT,shutdown)
648 d = pbs_set.start(args.n)
660 d = pbs_set.start(args.n)
649 return d
661 return d
650 config = kernel_config_manager.get_config_obj()
662 config = kernel_config_manager.get_config_obj()
651 furl_file = config['controller']['engine_furl_file']
663 furl_file = config['controller']['engine_furl_file']
652 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
664 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
653 dstart.addErrback(_err_and_stop)
665 dstart.addErrback(_err_and_stop)
654
666
655 def main_sge(args):
667 def main_sge(args):
656 cont_args = []
668 cont_args = []
657 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
669 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
658
670
659 # Check security settings before proceeding
671 # Check security settings before proceeding
660 if not check_security(args, cont_args):
672 if not check_security(args, cont_args):
661 return
673 return
662
674
663 # See if we are reusing FURL files
675 # See if we are reusing FURL files
664 if not check_reuse(args, cont_args):
676 if not check_reuse(args, cont_args):
665 return
677 return
666
678
667 if args.sgescript and not os.path.isfile(args.sgescript):
679 if args.sgescript and not os.path.isfile(args.sgescript):
668 log.err('SGE script does not exist: %s' % args.sgescript)
680 log.err('SGE script does not exist: %s' % args.sgescript)
669 return
681 return
670
682
671 cl = ControllerLauncher(extra_args=cont_args)
683 cl = ControllerLauncher(extra_args=cont_args)
672 dstart = cl.start()
684 dstart = cl.start()
673 def start_engines(r):
685 def start_engines(r):
674 sge_set = SGEEngineSet(args.sgescript)
686 sge_set = SGEEngineSet(args.sgescript)
675 def shutdown(signum, frame):
687 def shutdown(signum, frame):
676 log.msg('Stopping sge cluster')
688 log.msg('Stopping sge cluster')
677 d = sge_set.kill()
689 d = sge_set.kill()
678 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
690 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
679 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
691 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
680 signal.signal(signal.SIGINT,shutdown)
692 signal.signal(signal.SIGINT,shutdown)
681 d = sge_set.start(args.n)
693 d = sge_set.start(args.n)
682 return d
694 return d
683 config = kernel_config_manager.get_config_obj()
695 config = kernel_config_manager.get_config_obj()
684 furl_file = config['controller']['engine_furl_file']
696 furl_file = config['controller']['engine_furl_file']
685 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
697 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
686 dstart.addErrback(_err_and_stop)
698 dstart.addErrback(_err_and_stop)
687
699
688
700
689 def main_ssh(args):
701 def main_ssh(args):
690 """Start a controller on localhost and engines using ssh.
702 """Start a controller on localhost and engines using ssh.
691
703
692 Your clusterfile should look like::
704 Your clusterfile should look like::
693
705
694 send_furl = False # True, if you want
706 send_furl = False # True, if you want
695 engines = {
707 engines = {
696 'engine_host1' : engine_count,
708 'engine_host1' : engine_count,
697 'engine_host2' : engine_count2
709 'engine_host2' : engine_count2
698 }
710 }
699 """
711 """
700 clusterfile = {}
712 clusterfile = {}
701 execfile(args.clusterfile, clusterfile)
713 execfile(args.clusterfile, clusterfile)
702 if not clusterfile.has_key('send_furl'):
714 if not clusterfile.has_key('send_furl'):
703 clusterfile['send_furl'] = False
715 clusterfile['send_furl'] = False
704
716
705 cont_args = []
717 cont_args = []
706 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
718 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
707
719
708 # Check security settings before proceeding
720 # Check security settings before proceeding
709 if not check_security(args, cont_args):
721 if not check_security(args, cont_args):
710 return
722 return
711
723
712 # See if we are reusing FURL files
724 # See if we are reusing FURL files
713 if not check_reuse(args, cont_args):
725 if not check_reuse(args, cont_args):
714 return
726 return
715
727
716 cl = ControllerLauncher(extra_args=cont_args)
728 cl = ControllerLauncher(extra_args=cont_args)
717 dstart = cl.start()
729 dstart = cl.start()
718 def start_engines(cont_pid):
730 def start_engines(cont_pid):
719 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
731 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
720 def shutdown(signum, frame):
732 def shutdown(signum, frame):
721 d = ssh_set.kill()
733 d = ssh_set.kill()
722 cl.interrupt_then_kill(1.0)
734 cl.interrupt_then_kill(1.0)
723 reactor.callLater(2.0, reactor.stop)
735 reactor.callLater(2.0, reactor.stop)
724 signal.signal(signal.SIGINT,shutdown)
736 signal.signal(signal.SIGINT,shutdown)
725 d = ssh_set.start(clusterfile['send_furl'])
737 d = ssh_set.start(clusterfile['send_furl'])
726 return d
738 return d
727 config = kernel_config_manager.get_config_obj()
739 config = kernel_config_manager.get_config_obj()
728 furl_file = config['controller']['engine_furl_file']
740 furl_file = config['controller']['engine_furl_file']
729 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
741 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
730 dstart.addErrback(_err_and_stop)
742 dstart.addErrback(_err_and_stop)
731
743
732
744
733 def get_args():
745 def get_args():
734 base_parser = argparse.ArgumentParser(add_help=False)
746 base_parser = argparse.ArgumentParser(add_help=False)
735 base_parser.add_argument(
747 base_parser.add_argument(
736 '-r',
748 '-r',
737 action='store_true',
749 action='store_true',
738 dest='r',
750 dest='r',
739 help='try to reuse FURL files. Use with --client-port and --engine-port'
751 help='try to reuse FURL files. Use with --client-port and --engine-port'
740 )
752 )
741 base_parser.add_argument(
753 base_parser.add_argument(
742 '--client-port',
754 '--client-port',
743 type=int,
755 type=int,
744 dest='client_port',
756 dest='client_port',
745 help='the port the controller will listen on for client connections',
757 help='the port the controller will listen on for client connections',
746 default=0
758 default=0
747 )
759 )
748 base_parser.add_argument(
760 base_parser.add_argument(
749 '--engine-port',
761 '--engine-port',
750 type=int,
762 type=int,
751 dest='engine_port',
763 dest='engine_port',
752 help='the port the controller will listen on for engine connections',
764 help='the port the controller will listen on for engine connections',
753 default=0
765 default=0
754 )
766 )
755 base_parser.add_argument(
767 base_parser.add_argument(
756 '-x',
768 '-x',
757 action='store_true',
769 action='store_true',
758 dest='x',
770 dest='x',
759 help='turn off client security'
771 help='turn off client security'
760 )
772 )
761 base_parser.add_argument(
773 base_parser.add_argument(
762 '-y',
774 '-y',
763 action='store_true',
775 action='store_true',
764 dest='y',
776 dest='y',
765 help='turn off engine security'
777 help='turn off engine security'
766 )
778 )
767 base_parser.add_argument(
779 base_parser.add_argument(
768 "--logdir",
780 "--logdir",
769 type=str,
781 type=str,
770 dest="logdir",
782 dest="logdir",
771 help="directory to put log files (default=$IPYTHONDIR/log)",
783 help="directory to put log files (default=$IPYTHONDIR/log)",
772 default=pjoin(get_ipython_dir(),'log')
784 default=pjoin(get_ipython_dir(),'log')
773 )
785 )
774 base_parser.add_argument(
786 base_parser.add_argument(
775 "-n",
787 "-n",
776 "--num",
788 "--num",
777 type=int,
789 type=int,
778 dest="n",
790 dest="n",
779 default=2,
791 default=2,
780 help="the number of engines to start"
792 help="the number of engines to start"
781 )
793 )
782
794
783 parser = argparse.ArgumentParser(
795 parser = argparse.ArgumentParser(
784 description='IPython cluster startup. This starts a controller and\
796 description='IPython cluster startup. This starts a controller and\
785 engines using various approaches. Use the IPYTHONDIR environment\
797 engines using various approaches. Use the IPYTHONDIR environment\
786 variable to change your IPython directory from the default of\
798 variable to change your IPython directory from the default of\
787 .ipython or _ipython. The log and security subdirectories of your\
799 .ipython or _ipython. The log and security subdirectories of your\
788 IPython directory will be used by this script for log files and\
800 IPython directory will be used by this script for log files and\
789 security files.'
801 security files.'
790 )
802 )
791 subparsers = parser.add_subparsers(
803 subparsers = parser.add_subparsers(
792 help='available cluster types. For help, do "ipcluster TYPE --help"')
804 help='available cluster types. For help, do "ipcluster TYPE --help"')
793
805
794 parser_local = subparsers.add_parser(
806 parser_local = subparsers.add_parser(
795 'local',
807 'local',
796 help='run a local cluster',
808 help='run a local cluster',
797 parents=[base_parser]
809 parents=[base_parser]
798 )
810 )
799 parser_local.set_defaults(func=main_local)
811 parser_local.set_defaults(func=main_local)
800
812
801 parser_mpirun = subparsers.add_parser(
813 parser_mpirun = subparsers.add_parser(
802 'mpirun',
814 'mpirun',
803 help='run a cluster using mpirun (mpiexec also works)',
815 help='run a cluster using mpirun (mpiexec also works)',
804 parents=[base_parser]
816 parents=[base_parser]
805 )
817 )
806 parser_mpirun.add_argument(
818 parser_mpirun.add_argument(
807 "--mpi",
819 "--mpi",
808 type=str,
820 type=str,
809 dest="mpi", # Don't put a default here to allow no MPI support
821 dest="mpi", # Don't put a default here to allow no MPI support
810 help="how to call MPI_Init (default=mpi4py)"
822 help="how to call MPI_Init (default=mpi4py)"
811 )
823 )
812 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
824 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
813
825
814 parser_mpiexec = subparsers.add_parser(
826 parser_mpiexec = subparsers.add_parser(
815 'mpiexec',
827 'mpiexec',
816 help='run a cluster using mpiexec (mpirun also works)',
828 help='run a cluster using mpiexec (mpirun also works)',
817 parents=[base_parser]
829 parents=[base_parser]
818 )
830 )
819 parser_mpiexec.add_argument(
831 parser_mpiexec.add_argument(
820 "--mpi",
832 "--mpi",
821 type=str,
833 type=str,
822 dest="mpi", # Don't put a default here to allow no MPI support
834 dest="mpi", # Don't put a default here to allow no MPI support
823 help="how to call MPI_Init (default=mpi4py)"
835 help="how to call MPI_Init (default=mpi4py)"
824 )
836 )
825 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
837 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
826
838
827 parser_pbs = subparsers.add_parser(
839 parser_pbs = subparsers.add_parser(
828 'pbs',
840 'pbs',
829 help='run a pbs cluster',
841 help='run a pbs cluster',
830 parents=[base_parser]
842 parents=[base_parser]
831 )
843 )
832 parser_pbs.add_argument(
844 parser_pbs.add_argument(
833 '--pbs-script',
845 '--pbs-script',
834 type=str,
846 type=str,
835 dest='pbsscript',
847 dest='pbsscript',
836 help='PBS script template',
848 help='PBS script template',
837 default=''
849 default=''
838 )
850 )
839 parser_pbs.set_defaults(func=main_pbs)
851 parser_pbs.set_defaults(func=main_pbs)
840
852
841 parser_sge = subparsers.add_parser(
853 parser_sge = subparsers.add_parser(
842 'sge',
854 'sge',
843 help='run an sge cluster',
855 help='run an sge cluster',
844 parents=[base_parser]
856 parents=[base_parser]
845 )
857 )
846 parser_sge.add_argument(
858 parser_sge.add_argument(
847 '--sge-script',
859 '--sge-script',
848 type=str,
860 type=str,
849 dest='sgescript',
861 dest='sgescript',
850 help='SGE script template',
862 help='SGE script template',
851 default='' # SGEEngineSet will create one if not specified
863 default='' # SGEEngineSet will create one if not specified
852 )
864 )
853 parser_sge.set_defaults(func=main_sge)
865 parser_sge.set_defaults(func=main_sge)
854
866
867 parser_lsf = subparsers.add_parser(
868 'lsf',
869 help='run an lsf cluster',
870 parents=[base_parser]
871 )
872 parser_lsf.add_argument(
873 '--lsf-script',
874 type=str,
875 dest='lsfscript',
876 help='LSF script template',
877 default='' # LSFEngineSet will create one if not specified
878 )
879 parser_lsf.set_defaults(func=main_sge)
880
855 parser_ssh = subparsers.add_parser(
881 parser_ssh = subparsers.add_parser(
856 'ssh',
882 'ssh',
857 help='run a cluster using ssh, should have ssh-keys setup',
883 help='run a cluster using ssh, should have ssh-keys setup',
858 parents=[base_parser]
884 parents=[base_parser]
859 )
885 )
860 parser_ssh.add_argument(
886 parser_ssh.add_argument(
861 '--clusterfile',
887 '--clusterfile',
862 type=str,
888 type=str,
863 dest='clusterfile',
889 dest='clusterfile',
864 help='python file describing the cluster',
890 help='python file describing the cluster',
865 default='clusterfile.py',
891 default='clusterfile.py',
866 )
892 )
867 parser_ssh.add_argument(
893 parser_ssh.add_argument(
868 '--sshx',
894 '--sshx',
869 type=str,
895 type=str,
870 dest='sshx',
896 dest='sshx',
871 help='sshx launcher helper'
897 help='sshx launcher helper'
872 )
898 )
873 parser_ssh.set_defaults(func=main_ssh)
899 parser_ssh.set_defaults(func=main_ssh)
874
900
875 args = parser.parse_args()
901 args = parser.parse_args()
876 return args
902 return args
877
903
878 def main():
904 def main():
879 args = get_args()
905 args = get_args()
880 reactor.callWhenRunning(args.func, args)
906 reactor.callWhenRunning(args.func, args)
881 log.startLogging(sys.stdout)
907 log.startLogging(sys.stdout)
882 reactor.run()
908 reactor.run()
883
909
884 if __name__ == '__main__':
910 if __name__ == '__main__':
885 main()
911 main()
General Comments 0
You need to be logged in to leave comments. Login now