##// END OF EJS Templates
update PBSEngineSet to use job arrays...
Justin Riley -
Show More
@@ -1,908 +1,885 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 import tempfile
22 pjoin = os.path.join
22 pjoin = os.path.join
23
23
24 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import failure, log
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.external import Itpl
31 from IPython.external import Itpl
32 from IPython.genutils import (
32 from IPython.genutils import (
33 get_ipython_dir,
33 get_ipython_dir,
34 get_log_dir,
34 get_log_dir,
35 get_security_dir,
35 get_security_dir,
36 num_cpus
36 num_cpus
37 )
37 )
38 from IPython.kernel.fcutil import have_crypto
38 from IPython.kernel.fcutil import have_crypto
39
39
40 # Create various ipython directories if they don't exist.
40 # Create various ipython directories if they don't exist.
41 # This must be done before IPython.kernel.config is imported.
41 # This must be done before IPython.kernel.config is imported.
42 from IPython.iplib import user_setup
42 from IPython.iplib import user_setup
43 if os.name == 'posix':
43 if os.name == 'posix':
44 rc_suffix = ''
44 rc_suffix = ''
45 else:
45 else:
46 rc_suffix = '.ini'
46 rc_suffix = '.ini'
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 get_log_dir()
48 get_log_dir()
49 get_security_dir()
49 get_security_dir()
50
50
51 from IPython.kernel.config import config_manager as kernel_config_manager
51 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.error import SecurityError, FileTimeoutError
52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.fcutil import have_crypto
53 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.util import printer
55 from IPython.kernel.util import printer
56
56
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58 # General process handling code
58 # General process handling code
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60
60
61
61
62 class ProcessStateError(Exception):
62 class ProcessStateError(Exception):
63 pass
63 pass
64
64
65 class UnknownStatus(Exception):
65 class UnknownStatus(Exception):
66 pass
66 pass
67
67
68 class LauncherProcessProtocol(ProcessProtocol):
68 class LauncherProcessProtocol(ProcessProtocol):
69 """
69 """
70 A ProcessProtocol to go with the ProcessLauncher.
70 A ProcessProtocol to go with the ProcessLauncher.
71 """
71 """
72 def __init__(self, process_launcher):
72 def __init__(self, process_launcher):
73 self.process_launcher = process_launcher
73 self.process_launcher = process_launcher
74
74
75 def connectionMade(self):
75 def connectionMade(self):
76 self.process_launcher.fire_start_deferred(self.transport.pid)
76 self.process_launcher.fire_start_deferred(self.transport.pid)
77
77
78 def processEnded(self, status):
78 def processEnded(self, status):
79 value = status.value
79 value = status.value
80 if isinstance(value, ProcessDone):
80 if isinstance(value, ProcessDone):
81 self.process_launcher.fire_stop_deferred(0)
81 self.process_launcher.fire_stop_deferred(0)
82 elif isinstance(value, ProcessTerminated):
82 elif isinstance(value, ProcessTerminated):
83 self.process_launcher.fire_stop_deferred(
83 self.process_launcher.fire_stop_deferred(
84 {'exit_code':value.exitCode,
84 {'exit_code':value.exitCode,
85 'signal':value.signal,
85 'signal':value.signal,
86 'status':value.status
86 'status':value.status
87 }
87 }
88 )
88 )
89 else:
89 else:
90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91
91
92 def outReceived(self, data):
92 def outReceived(self, data):
93 log.msg(data)
93 log.msg(data)
94
94
95 def errReceived(self, data):
95 def errReceived(self, data):
96 log.err(data)
96 log.err(data)
97
97
98 class ProcessLauncher(object):
98 class ProcessLauncher(object):
99 """
99 """
100 Start and stop an external process in an asynchronous manner.
100 Start and stop an external process in an asynchronous manner.
101
101
102 Currently this uses deferreds to notify other parties of process state
102 Currently this uses deferreds to notify other parties of process state
103 changes. This is an awkward design and should be moved to using
103 changes. This is an awkward design and should be moved to using
104 a formal NotificationCenter.
104 a formal NotificationCenter.
105 """
105 """
106 def __init__(self, cmd_and_args):
106 def __init__(self, cmd_and_args):
107 self.cmd = cmd_and_args[0]
107 self.cmd = cmd_and_args[0]
108 self.args = cmd_and_args
108 self.args = cmd_and_args
109 self._reset()
109 self._reset()
110
110
111 def _reset(self):
111 def _reset(self):
112 self.process_protocol = None
112 self.process_protocol = None
113 self.pid = None
113 self.pid = None
114 self.start_deferred = None
114 self.start_deferred = None
115 self.stop_deferreds = []
115 self.stop_deferreds = []
116 self.state = 'before' # before, running, or after
116 self.state = 'before' # before, running, or after
117
117
118 @property
118 @property
119 def running(self):
119 def running(self):
120 if self.state == 'running':
120 if self.state == 'running':
121 return True
121 return True
122 else:
122 else:
123 return False
123 return False
124
124
125 def fire_start_deferred(self, pid):
125 def fire_start_deferred(self, pid):
126 self.pid = pid
126 self.pid = pid
127 self.state = 'running'
127 self.state = 'running'
128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 self.start_deferred.callback(pid)
129 self.start_deferred.callback(pid)
130
130
131 def start(self):
131 def start(self):
132 if self.state == 'before':
132 if self.state == 'before':
133 self.process_protocol = LauncherProcessProtocol(self)
133 self.process_protocol = LauncherProcessProtocol(self)
134 self.start_deferred = defer.Deferred()
134 self.start_deferred = defer.Deferred()
135 self.process_transport = reactor.spawnProcess(
135 self.process_transport = reactor.spawnProcess(
136 self.process_protocol,
136 self.process_protocol,
137 self.cmd,
137 self.cmd,
138 self.args,
138 self.args,
139 env=os.environ
139 env=os.environ
140 )
140 )
141 return self.start_deferred
141 return self.start_deferred
142 else:
142 else:
143 s = 'the process has already been started and has state: %r' % \
143 s = 'the process has already been started and has state: %r' % \
144 self.state
144 self.state
145 return defer.fail(ProcessStateError(s))
145 return defer.fail(ProcessStateError(s))
146
146
147 def get_stop_deferred(self):
147 def get_stop_deferred(self):
148 if self.state == 'running' or self.state == 'before':
148 if self.state == 'running' or self.state == 'before':
149 d = defer.Deferred()
149 d = defer.Deferred()
150 self.stop_deferreds.append(d)
150 self.stop_deferreds.append(d)
151 return d
151 return d
152 else:
152 else:
153 s = 'this process is already complete'
153 s = 'this process is already complete'
154 return defer.fail(ProcessStateError(s))
154 return defer.fail(ProcessStateError(s))
155
155
156 def fire_stop_deferred(self, exit_code):
156 def fire_stop_deferred(self, exit_code):
157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 self.state = 'after'
158 self.state = 'after'
159 for d in self.stop_deferreds:
159 for d in self.stop_deferreds:
160 d.callback(exit_code)
160 d.callback(exit_code)
161
161
162 def signal(self, sig):
162 def signal(self, sig):
163 """
163 """
164 Send a signal to the process.
164 Send a signal to the process.
165
165
166 The argument sig can be ('KILL','INT', etc.) or any signal number.
166 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 """
167 """
168 if self.state == 'running':
168 if self.state == 'running':
169 self.process_transport.signalProcess(sig)
169 self.process_transport.signalProcess(sig)
170
170
171 # def __del__(self):
171 # def __del__(self):
172 # self.signal('KILL')
172 # self.signal('KILL')
173
173
174 def interrupt_then_kill(self, delay=1.0):
174 def interrupt_then_kill(self, delay=1.0):
175 self.signal('INT')
175 self.signal('INT')
176 reactor.callLater(delay, self.signal, 'KILL')
176 reactor.callLater(delay, self.signal, 'KILL')
177
177
178
178
179 #-----------------------------------------------------------------------------
179 #-----------------------------------------------------------------------------
180 # Code for launching controller and engines
180 # Code for launching controller and engines
181 #-----------------------------------------------------------------------------
181 #-----------------------------------------------------------------------------
182
182
183
183
184 class ControllerLauncher(ProcessLauncher):
184 class ControllerLauncher(ProcessLauncher):
185
185
186 def __init__(self, extra_args=None):
186 def __init__(self, extra_args=None):
187 if sys.platform == 'win32':
187 if sys.platform == 'win32':
188 # This logic is needed because the ipcontroller script doesn't
188 # This logic is needed because the ipcontroller script doesn't
189 # always get installed in the same way or in the same location.
189 # always get installed in the same way or in the same location.
190 from IPython.kernel.scripts import ipcontroller
190 from IPython.kernel.scripts import ipcontroller
191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 # The -u option here turns on unbuffered output, which is required
192 # The -u option here turns on unbuffered output, which is required
193 # on Win32 to prevent wierd conflict and problems with Twisted.
193 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # Also, use sys.executable to make sure we are picking up the
194 # Also, use sys.executable to make sure we are picking up the
195 # right python exe.
195 # right python exe.
196 args = [sys.executable, '-u', script_location]
196 args = [sys.executable, '-u', script_location]
197 else:
197 else:
198 args = ['ipcontroller']
198 args = ['ipcontroller']
199 self.extra_args = extra_args
199 self.extra_args = extra_args
200 if extra_args is not None:
200 if extra_args is not None:
201 args.extend(extra_args)
201 args.extend(extra_args)
202
202
203 ProcessLauncher.__init__(self, args)
203 ProcessLauncher.__init__(self, args)
204
204
205
205
206 class EngineLauncher(ProcessLauncher):
206 class EngineLauncher(ProcessLauncher):
207
207
208 def __init__(self, extra_args=None):
208 def __init__(self, extra_args=None):
209 if sys.platform == 'win32':
209 if sys.platform == 'win32':
210 # This logic is needed because the ipcontroller script doesn't
210 # This logic is needed because the ipcontroller script doesn't
211 # always get installed in the same way or in the same location.
211 # always get installed in the same way or in the same location.
212 from IPython.kernel.scripts import ipengine
212 from IPython.kernel.scripts import ipengine
213 script_location = ipengine.__file__.replace('.pyc', '.py')
213 script_location = ipengine.__file__.replace('.pyc', '.py')
214 # The -u option here turns on unbuffered output, which is required
214 # The -u option here turns on unbuffered output, which is required
215 # on Win32 to prevent wierd conflict and problems with Twisted.
215 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # Also, use sys.executable to make sure we are picking up the
216 # Also, use sys.executable to make sure we are picking up the
217 # right python exe.
217 # right python exe.
218 args = [sys.executable, '-u', script_location]
218 args = [sys.executable, '-u', script_location]
219 else:
219 else:
220 args = ['ipengine']
220 args = ['ipengine']
221 self.extra_args = extra_args
221 self.extra_args = extra_args
222 if extra_args is not None:
222 if extra_args is not None:
223 args.extend(extra_args)
223 args.extend(extra_args)
224
224
225 ProcessLauncher.__init__(self, args)
225 ProcessLauncher.__init__(self, args)
226
226
227
227
228 class LocalEngineSet(object):
228 class LocalEngineSet(object):
229
229
230 def __init__(self, extra_args=None):
230 def __init__(self, extra_args=None):
231 self.extra_args = extra_args
231 self.extra_args = extra_args
232 self.launchers = []
232 self.launchers = []
233
233
234 def start(self, n):
234 def start(self, n):
235 dlist = []
235 dlist = []
236 for i in range(n):
236 for i in range(n):
237 print "starting engine:", i
237 print "starting engine:", i
238 el = EngineLauncher(extra_args=self.extra_args)
238 el = EngineLauncher(extra_args=self.extra_args)
239 d = el.start()
239 d = el.start()
240 self.launchers.append(el)
240 self.launchers.append(el)
241 dlist.append(d)
241 dlist.append(d)
242 dfinal = gatherBoth(dlist, consumeErrors=True)
242 dfinal = gatherBoth(dlist, consumeErrors=True)
243 dfinal.addCallback(self._handle_start)
243 dfinal.addCallback(self._handle_start)
244 return dfinal
244 return dfinal
245
245
246 def _handle_start(self, r):
246 def _handle_start(self, r):
247 log.msg('Engines started with pids: %r' % r)
247 log.msg('Engines started with pids: %r' % r)
248 return r
248 return r
249
249
250 def _handle_stop(self, r):
250 def _handle_stop(self, r):
251 log.msg('Engines received signal: %r' % r)
251 log.msg('Engines received signal: %r' % r)
252 return r
252 return r
253
253
254 def signal(self, sig):
254 def signal(self, sig):
255 dlist = []
255 dlist = []
256 for el in self.launchers:
256 for el in self.launchers:
257 d = el.get_stop_deferred()
257 d = el.get_stop_deferred()
258 dlist.append(d)
258 dlist.append(d)
259 el.signal(sig)
259 el.signal(sig)
260 dfinal = gatherBoth(dlist, consumeErrors=True)
260 dfinal = gatherBoth(dlist, consumeErrors=True)
261 dfinal.addCallback(self._handle_stop)
261 dfinal.addCallback(self._handle_stop)
262 return dfinal
262 return dfinal
263
263
264 def interrupt_then_kill(self, delay=1.0):
264 def interrupt_then_kill(self, delay=1.0):
265 dlist = []
265 dlist = []
266 for el in self.launchers:
266 for el in self.launchers:
267 d = el.get_stop_deferred()
267 d = el.get_stop_deferred()
268 dlist.append(d)
268 dlist.append(d)
269 el.interrupt_then_kill(delay)
269 el.interrupt_then_kill(delay)
270 dfinal = gatherBoth(dlist, consumeErrors=True)
270 dfinal = gatherBoth(dlist, consumeErrors=True)
271 dfinal.addCallback(self._handle_stop)
271 dfinal.addCallback(self._handle_stop)
272 return dfinal
272 return dfinal
273
273
274
275 class BatchEngineSet(object):
274 class BatchEngineSet(object):
276
275
277 # Subclasses must fill these in. See PBSEngineSet
276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 name = ''
278 submit_command = ''
278 submit_command = ''
279 delete_command = ''
279 delete_command = ''
280 script_param_prefix = ''
280 job_id_regexp = ''
281 job_id_regexp = ''
281
282 job_array_regexp = ''
283 default_template = ''
284
282 def __init__(self, template_file, **kwargs):
285 def __init__(self, template_file, **kwargs):
283 self.template_file = template_file
286 self.template_file = template_file
284 self.context = {}
287
285 self.context.update(kwargs)
286 self.batch_file = self.template_file+'-run'
287
288 def parse_job_id(self, output):
288 def parse_job_id(self, output):
289 m = re.match(self.job_id_regexp, output)
289 m = re.search(self.job_id_regexp, output)
290 if m is not None:
290 if m is not None:
291 job_id = m.group()
291 job_id = m.group()
292 else:
292 else:
293 raise Exception("job id couldn't be determined: %s" % output)
293 raise Exception("job id couldn't be determined: %s" % output)
294 self.job_id = job_id
294 self.job_id = job_id
295 log.msg('Job started with job id: %r' % job_id)
295 log.msg('Job started with job id: %r' % job_id)
296 return job_id
296 return job_id
297
297
298 def write_batch_script(self, n):
299 self.context['n'] = n
300 template = open(self.template_file, 'r').read()
301 log.msg('Using template for batch script: %s' % self.template_file)
302 script_as_string = Itpl.itplns(template, self.context)
303 log.msg('Writing instantiated batch script: %s' % self.batch_file)
304 f = open(self.batch_file,'w')
305 f.write(script_as_string)
306 f.close()
307
308 def handle_error(self, f):
298 def handle_error(self, f):
309 f.printTraceback()
299 f.printTraceback()
310 f.raiseException()
300 f.raiseException()
311
312 def start(self, n):
313 self.write_batch_script(n)
314 d = getProcessOutput(self.submit_command,
315 [self.batch_file],env=os.environ)
316 d.addCallback(self.parse_job_id)
317 d.addErrback(self.handle_error)
318 return d
319
320 def kill(self):
321 d = getProcessOutput(self.delete_command,
322 [self.job_id],env=os.environ)
323 return d
324
325 class PBSEngineSet(BatchEngineSet):
326
327 submit_command = 'qsub'
328 delete_command = 'qdel'
329 job_id_regexp = '\d+'
330
331 def __init__(self, template_file, **kwargs):
332 BatchEngineSet.__init__(self, template_file, **kwargs)
333
334 class SGEEngineSet(PBSEngineSet):
335
336 def __init__(self, template_file, **kwargs):
337 BatchEngineSet.__init__(self, template_file, **kwargs)
338 self._temp_file = None
339
340 def parse_job_id(self, output):
341 m = re.search(self.job_id_regexp, output)
342 if m is not None:
343 job_id = m.group()
344 else:
345 raise Exception("job id couldn't be determined: %s" % output)
346 self.job_id = job_id
347 log.msg('job started with job id: %r' % job_id)
348 return job_id
349
301
350 def start(self, n):
302 def start(self, n):
351 log.msg("starting %d engines" % n)
303 log.msg("starting %d engines" % n)
352 self._temp_file = tempfile.NamedTemporaryFile()
304 self._temp_file = tempfile.NamedTemporaryFile()
353 regex = re.compile('#\$[ \t]+-t[ \t]+\d+')
305 regex = re.compile(self.job_array_regexp)
354 if self.template_file:
306 if self.template_file:
355 log.msg("Using sge script %s" % self.template_file)
307 log.msg("Using %s script %s" % (self.name, self.template_file))
356 contents = open(self.template_file, 'r').read()
308 contents = open(self.template_file, 'r').read()
357 if not regex.search(contents):
309 if not regex.search(contents):
358 log.msg("adding job array settings to sge script")
310 log.msg("adding job array settings to %s script" % self.name)
359 contents = ("#$ -t 1-%d\n" % n) + contents
311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
360 self._temp_file.write(contents)
312 self._temp_file.write(contents)
361 self.template_file = self._temp_file.name
313 self.template_file = self._temp_file.name
362 else:
314 else:
363 log.msg("using default ipengine sge script: \n%s" %
315 log.msg("using default ipengine %s script: \n%s" %
364 (sge_template % n))
316 (self.name, (self.default_template % n)))
365 self._temp_file.file.write(sge_template % n)
317 self._temp_file.file.write(self.default_template % n)
366 self.template_file = self._temp_file.name
318 self.template_file = self._temp_file.name
367 self._temp_file.file.flush()
319 self._temp_file.file.flush()
368 d = getProcessOutput(self.submit_command,
320 d = getProcessOutput(self.submit_command,
369 [self.template_file],
321 [self.template_file],
370 env=os.environ)
322 env=os.environ)
371 d.addCallback(self.parse_job_id)
323 d.addCallback(self.parse_job_id)
372 d.addErrback(self.handle_error)
324 d.addErrback(self.handle_error)
373 return d
325 return d
374
326
375 sge_template="""#$ -V
327 def kill(self):
328 d = getProcessOutput(self.delete_command,
329 [self.job_id],env=os.environ)
330 return d
331
332 class PBSEngineSet(BatchEngineSet):
333
334 name = 'PBS'
335 submit_command = 'qsub'
336 delete_command = 'qdel'
337 script_param_prefix = "#PBS"
338 job_id_regexp = '\d+'
339 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
340 default_template="""#PBS -V
341 #PBS -t 1-%d
342 #PBS -N ipengine
343 eid=$(($PBS_ARRAYID - 1))
344 ipengine --logfile=ipengine${eid}.log
345 """
346
347 class SGEEngineSet(PBSEngineSet):
348
349 name = 'SGE'
350 script_param_prefix = "#$"
351 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
352 default_template="""#$ -V
376 #$ -t 1-%d
353 #$ -t 1-%d
377 #$ -N ipengine
354 #$ -N ipengine
378 eid=$(($SGE_TASK_ID - 1))
355 eid=$(($SGE_TASK_ID - 1))
379 ipengine --logfile=ipengine${eid}.log
356 ipengine --logfile=ipengine${eid}.log
380 """
357 """
381
358
382 sshx_template="""#!/bin/sh
359 sshx_template="""#!/bin/sh
383 "$@" &> /dev/null &
360 "$@" &> /dev/null &
384 echo $!
361 echo $!
385 """
362 """
386
363
387 engine_killer_template="""#!/bin/sh
364 engine_killer_template="""#!/bin/sh
388 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
365 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
389 """
366 """
390
367
391 class SSHEngineSet(object):
368 class SSHEngineSet(object):
392 sshx_template=sshx_template
369 sshx_template=sshx_template
393 engine_killer_template=engine_killer_template
370 engine_killer_template=engine_killer_template
394
371
395 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
372 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
396 """Start a controller on localhost and engines using ssh.
373 """Start a controller on localhost and engines using ssh.
397
374
398 The engine_hosts argument is a dict with hostnames as keys and
375 The engine_hosts argument is a dict with hostnames as keys and
399 the number of engine (int) as values. sshx is the name of a local
376 the number of engine (int) as values. sshx is the name of a local
400 file that will be used to run remote commands. This file is used
377 file that will be used to run remote commands. This file is used
401 to setup the environment properly.
378 to setup the environment properly.
402 """
379 """
403
380
404 self.temp_dir = tempfile.gettempdir()
381 self.temp_dir = tempfile.gettempdir()
405 if sshx is not None:
382 if sshx is not None:
406 self.sshx = sshx
383 self.sshx = sshx
407 else:
384 else:
408 # Write the sshx.sh file locally from our template.
385 # Write the sshx.sh file locally from our template.
409 self.sshx = os.path.join(
386 self.sshx = os.path.join(
410 self.temp_dir,
387 self.temp_dir,
411 '%s-main-sshx.sh' % os.environ['USER']
388 '%s-main-sshx.sh' % os.environ['USER']
412 )
389 )
413 f = open(self.sshx, 'w')
390 f = open(self.sshx, 'w')
414 f.writelines(self.sshx_template)
391 f.writelines(self.sshx_template)
415 f.close()
392 f.close()
416 self.engine_command = ipengine
393 self.engine_command = ipengine
417 self.engine_hosts = engine_hosts
394 self.engine_hosts = engine_hosts
418 # Write the engine killer script file locally from our template.
395 # Write the engine killer script file locally from our template.
419 self.engine_killer = os.path.join(
396 self.engine_killer = os.path.join(
420 self.temp_dir,
397 self.temp_dir,
421 '%s-local-engine_killer.sh' % os.environ['USER']
398 '%s-local-engine_killer.sh' % os.environ['USER']
422 )
399 )
423 f = open(self.engine_killer, 'w')
400 f = open(self.engine_killer, 'w')
424 f.writelines(self.engine_killer_template)
401 f.writelines(self.engine_killer_template)
425 f.close()
402 f.close()
426
403
427 def start(self, send_furl=False):
404 def start(self, send_furl=False):
428 dlist = []
405 dlist = []
429 for host in self.engine_hosts.keys():
406 for host in self.engine_hosts.keys():
430 count = self.engine_hosts[host]
407 count = self.engine_hosts[host]
431 d = self._start(host, count, send_furl)
408 d = self._start(host, count, send_furl)
432 dlist.append(d)
409 dlist.append(d)
433 return gatherBoth(dlist, consumeErrors=True)
410 return gatherBoth(dlist, consumeErrors=True)
434
411
435 def _start(self, hostname, count=1, send_furl=False):
412 def _start(self, hostname, count=1, send_furl=False):
436 if send_furl:
413 if send_furl:
437 d = self._scp_furl(hostname)
414 d = self._scp_furl(hostname)
438 else:
415 else:
439 d = defer.succeed(None)
416 d = defer.succeed(None)
440 d.addCallback(lambda r: self._scp_sshx(hostname))
417 d.addCallback(lambda r: self._scp_sshx(hostname))
441 d.addCallback(lambda r: self._ssh_engine(hostname, count))
418 d.addCallback(lambda r: self._ssh_engine(hostname, count))
442 return d
419 return d
443
420
444 def _scp_furl(self, hostname):
421 def _scp_furl(self, hostname):
445 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
422 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
446 cmd_list = scp_cmd.split()
423 cmd_list = scp_cmd.split()
447 cmd_list[1] = os.path.expanduser(cmd_list[1])
424 cmd_list[1] = os.path.expanduser(cmd_list[1])
448 log.msg('Copying furl file: %s' % scp_cmd)
425 log.msg('Copying furl file: %s' % scp_cmd)
449 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
426 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
450 return d
427 return d
451
428
452 def _scp_sshx(self, hostname):
429 def _scp_sshx(self, hostname):
453 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
430 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
454 self.sshx, hostname,
431 self.sshx, hostname,
455 self.temp_dir, os.environ['USER']
432 self.temp_dir, os.environ['USER']
456 )
433 )
457 print
434 print
458 log.msg("Copying sshx: %s" % scp_cmd)
435 log.msg("Copying sshx: %s" % scp_cmd)
459 sshx_scp = scp_cmd.split()
436 sshx_scp = scp_cmd.split()
460 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
437 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
461 return d
438 return d
462
439
463 def _ssh_engine(self, hostname, count):
440 def _ssh_engine(self, hostname, count):
464 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
441 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
465 hostname, self.temp_dir,
442 hostname, self.temp_dir,
466 os.environ['USER'], self.engine_command
443 os.environ['USER'], self.engine_command
467 )
444 )
468 cmds = exec_engine.split()
445 cmds = exec_engine.split()
469 dlist = []
446 dlist = []
470 log.msg("about to start engines...")
447 log.msg("about to start engines...")
471 for i in range(count):
448 for i in range(count):
472 log.msg('Starting engines: %s' % exec_engine)
449 log.msg('Starting engines: %s' % exec_engine)
473 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
450 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
474 dlist.append(d)
451 dlist.append(d)
475 return gatherBoth(dlist, consumeErrors=True)
452 return gatherBoth(dlist, consumeErrors=True)
476
453
477 def kill(self):
454 def kill(self):
478 dlist = []
455 dlist = []
479 for host in self.engine_hosts.keys():
456 for host in self.engine_hosts.keys():
480 d = self._killall(host)
457 d = self._killall(host)
481 dlist.append(d)
458 dlist.append(d)
482 return gatherBoth(dlist, consumeErrors=True)
459 return gatherBoth(dlist, consumeErrors=True)
483
460
484 def _killall(self, hostname):
461 def _killall(self, hostname):
485 d = self._scp_engine_killer(hostname)
462 d = self._scp_engine_killer(hostname)
486 d.addCallback(lambda r: self._ssh_kill(hostname))
463 d.addCallback(lambda r: self._ssh_kill(hostname))
487 # d.addErrback(self._exec_err)
464 # d.addErrback(self._exec_err)
488 return d
465 return d
489
466
490 def _scp_engine_killer(self, hostname):
467 def _scp_engine_killer(self, hostname):
491 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
468 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
492 self.engine_killer,
469 self.engine_killer,
493 hostname,
470 hostname,
494 self.temp_dir,
471 self.temp_dir,
495 os.environ['USER']
472 os.environ['USER']
496 )
473 )
497 cmds = scp_cmd.split()
474 cmds = scp_cmd.split()
498 log.msg('Copying engine_killer: %s' % scp_cmd)
475 log.msg('Copying engine_killer: %s' % scp_cmd)
499 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
476 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
500 return d
477 return d
501
478
502 def _ssh_kill(self, hostname):
479 def _ssh_kill(self, hostname):
503 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
480 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
504 hostname,
481 hostname,
505 self.temp_dir,
482 self.temp_dir,
506 os.environ['USER']
483 os.environ['USER']
507 )
484 )
508 log.msg('Killing engine: %s' % kill_cmd)
485 log.msg('Killing engine: %s' % kill_cmd)
509 kill_cmd = kill_cmd.split()
486 kill_cmd = kill_cmd.split()
510 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
487 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
511 return d
488 return d
512
489
513 def _exec_err(self, r):
490 def _exec_err(self, r):
514 log.msg(r)
491 log.msg(r)
515
492
516 #-----------------------------------------------------------------------------
493 #-----------------------------------------------------------------------------
517 # Main functions for the different types of clusters
494 # Main functions for the different types of clusters
518 #-----------------------------------------------------------------------------
495 #-----------------------------------------------------------------------------
519
496
520 # TODO:
497 # TODO:
521 # The logic in these codes should be moved into classes like LocalCluster
498 # The logic in these codes should be moved into classes like LocalCluster
522 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
499 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
523 # The main functions should then just parse the command line arguments, create
500 # The main functions should then just parse the command line arguments, create
524 # the appropriate class and call a 'start' method.
501 # the appropriate class and call a 'start' method.
525
502
526
503
527 def check_security(args, cont_args):
504 def check_security(args, cont_args):
528 """Check to see if we should run with SSL support."""
505 """Check to see if we should run with SSL support."""
529 if (not args.x or not args.y) and not have_crypto:
506 if (not args.x or not args.y) and not have_crypto:
530 log.err("""
507 log.err("""
531 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
508 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
532 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
509 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
533 reactor.stop()
510 reactor.stop()
534 return False
511 return False
535 if args.x:
512 if args.x:
536 cont_args.append('-x')
513 cont_args.append('-x')
537 if args.y:
514 if args.y:
538 cont_args.append('-y')
515 cont_args.append('-y')
539 return True
516 return True
540
517
541
518
542 def check_reuse(args, cont_args):
519 def check_reuse(args, cont_args):
543 """Check to see if we should try to resuse FURL files."""
520 """Check to see if we should try to resuse FURL files."""
544 if args.r:
521 if args.r:
545 cont_args.append('-r')
522 cont_args.append('-r')
546 if args.client_port == 0 or args.engine_port == 0:
523 if args.client_port == 0 or args.engine_port == 0:
547 log.err("""
524 log.err("""
548 To reuse FURL files, you must also set the client and engine ports using
525 To reuse FURL files, you must also set the client and engine ports using
549 the --client-port and --engine-port options.""")
526 the --client-port and --engine-port options.""")
550 reactor.stop()
527 reactor.stop()
551 return False
528 return False
552 cont_args.append('--client-port=%i' % args.client_port)
529 cont_args.append('--client-port=%i' % args.client_port)
553 cont_args.append('--engine-port=%i' % args.engine_port)
530 cont_args.append('--engine-port=%i' % args.engine_port)
554 return True
531 return True
555
532
556
533
557 def _err_and_stop(f):
534 def _err_and_stop(f):
558 """Errback to log a failure and halt the reactor on a fatal error."""
535 """Errback to log a failure and halt the reactor on a fatal error."""
559 log.err(f)
536 log.err(f)
560 reactor.stop()
537 reactor.stop()
561
538
562
539
563 def _delay_start(cont_pid, start_engines, furl_file, reuse):
540 def _delay_start(cont_pid, start_engines, furl_file, reuse):
564 """Wait for controller to create FURL files and the start the engines."""
541 """Wait for controller to create FURL files and the start the engines."""
565 if not reuse:
542 if not reuse:
566 if os.path.isfile(furl_file):
543 if os.path.isfile(furl_file):
567 os.unlink(furl_file)
544 os.unlink(furl_file)
568 log.msg('Waiting for controller to finish starting...')
545 log.msg('Waiting for controller to finish starting...')
569 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
546 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
570 d.addCallback(lambda _: log.msg('Controller started'))
547 d.addCallback(lambda _: log.msg('Controller started'))
571 d.addCallback(lambda _: start_engines(cont_pid))
548 d.addCallback(lambda _: start_engines(cont_pid))
572 return d
549 return d
573
550
574
551
575 def main_local(args):
552 def main_local(args):
576 cont_args = []
553 cont_args = []
577 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
554 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
578
555
579 # Check security settings before proceeding
556 # Check security settings before proceeding
580 if not check_security(args, cont_args):
557 if not check_security(args, cont_args):
581 return
558 return
582
559
583 # See if we are reusing FURL files
560 # See if we are reusing FURL files
584 if not check_reuse(args, cont_args):
561 if not check_reuse(args, cont_args):
585 return
562 return
586
563
587 cl = ControllerLauncher(extra_args=cont_args)
564 cl = ControllerLauncher(extra_args=cont_args)
588 dstart = cl.start()
565 dstart = cl.start()
589 def start_engines(cont_pid):
566 def start_engines(cont_pid):
590 engine_args = []
567 engine_args = []
591 engine_args.append('--logfile=%s' % \
568 engine_args.append('--logfile=%s' % \
592 pjoin(args.logdir,'ipengine%s-' % cont_pid))
569 pjoin(args.logdir,'ipengine%s-' % cont_pid))
593 eset = LocalEngineSet(extra_args=engine_args)
570 eset = LocalEngineSet(extra_args=engine_args)
594 def shutdown(signum, frame):
571 def shutdown(signum, frame):
595 log.msg('Stopping local cluster')
572 log.msg('Stopping local cluster')
596 # We are still playing with the times here, but these seem
573 # We are still playing with the times here, but these seem
597 # to be reliable in allowing everything to exit cleanly.
574 # to be reliable in allowing everything to exit cleanly.
598 eset.interrupt_then_kill(0.5)
575 eset.interrupt_then_kill(0.5)
599 cl.interrupt_then_kill(0.5)
576 cl.interrupt_then_kill(0.5)
600 reactor.callLater(1.0, reactor.stop)
577 reactor.callLater(1.0, reactor.stop)
601 signal.signal(signal.SIGINT,shutdown)
578 signal.signal(signal.SIGINT,shutdown)
602 d = eset.start(args.n)
579 d = eset.start(args.n)
603 return d
580 return d
604 config = kernel_config_manager.get_config_obj()
581 config = kernel_config_manager.get_config_obj()
605 furl_file = config['controller']['engine_furl_file']
582 furl_file = config['controller']['engine_furl_file']
606 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
583 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
607 dstart.addErrback(_err_and_stop)
584 dstart.addErrback(_err_and_stop)
608
585
609
586
610 def main_mpi(args):
587 def main_mpi(args):
611 cont_args = []
588 cont_args = []
612 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
589 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
613
590
614 # Check security settings before proceeding
591 # Check security settings before proceeding
615 if not check_security(args, cont_args):
592 if not check_security(args, cont_args):
616 return
593 return
617
594
618 # See if we are reusing FURL files
595 # See if we are reusing FURL files
619 if not check_reuse(args, cont_args):
596 if not check_reuse(args, cont_args):
620 return
597 return
621
598
622 cl = ControllerLauncher(extra_args=cont_args)
599 cl = ControllerLauncher(extra_args=cont_args)
623 dstart = cl.start()
600 dstart = cl.start()
624 def start_engines(cont_pid):
601 def start_engines(cont_pid):
625 raw_args = [args.cmd]
602 raw_args = [args.cmd]
626 raw_args.extend(['-n',str(args.n)])
603 raw_args.extend(['-n',str(args.n)])
627 raw_args.append('ipengine')
604 raw_args.append('ipengine')
628 raw_args.append('-l')
605 raw_args.append('-l')
629 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
606 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
630 if args.mpi:
607 if args.mpi:
631 raw_args.append('--mpi=%s' % args.mpi)
608 raw_args.append('--mpi=%s' % args.mpi)
632 eset = ProcessLauncher(raw_args)
609 eset = ProcessLauncher(raw_args)
633 def shutdown(signum, frame):
610 def shutdown(signum, frame):
634 log.msg('Stopping local cluster')
611 log.msg('Stopping local cluster')
635 # We are still playing with the times here, but these seem
612 # We are still playing with the times here, but these seem
636 # to be reliable in allowing everything to exit cleanly.
613 # to be reliable in allowing everything to exit cleanly.
637 eset.interrupt_then_kill(1.0)
614 eset.interrupt_then_kill(1.0)
638 cl.interrupt_then_kill(1.0)
615 cl.interrupt_then_kill(1.0)
639 reactor.callLater(2.0, reactor.stop)
616 reactor.callLater(2.0, reactor.stop)
640 signal.signal(signal.SIGINT,shutdown)
617 signal.signal(signal.SIGINT,shutdown)
641 d = eset.start()
618 d = eset.start()
642 return d
619 return d
643 config = kernel_config_manager.get_config_obj()
620 config = kernel_config_manager.get_config_obj()
644 furl_file = config['controller']['engine_furl_file']
621 furl_file = config['controller']['engine_furl_file']
645 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
622 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
646 dstart.addErrback(_err_and_stop)
623 dstart.addErrback(_err_and_stop)
647
624
648
625
649 def main_pbs(args):
626 def main_pbs(args):
650 cont_args = []
627 cont_args = []
651 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
628 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
652
629
653 # Check security settings before proceeding
630 # Check security settings before proceeding
654 if not check_security(args, cont_args):
631 if not check_security(args, cont_args):
655 return
632 return
656
633
657 # See if we are reusing FURL files
634 # See if we are reusing FURL files
658 if not check_reuse(args, cont_args):
635 if not check_reuse(args, cont_args):
659 return
636 return
660
637
661 cl = ControllerLauncher(extra_args=cont_args)
638 cl = ControllerLauncher(extra_args=cont_args)
662 dstart = cl.start()
639 dstart = cl.start()
663 def start_engines(r):
640 def start_engines(r):
664 pbs_set = PBSEngineSet(args.pbsscript)
641 pbs_set = PBSEngineSet(args.pbsscript)
665 def shutdown(signum, frame):
642 def shutdown(signum, frame):
666 log.msg('Stopping pbs cluster')
643 log.msg('Stopping pbs cluster')
667 d = pbs_set.kill()
644 d = pbs_set.kill()
668 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
645 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
669 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
646 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
670 signal.signal(signal.SIGINT,shutdown)
647 signal.signal(signal.SIGINT,shutdown)
671 d = pbs_set.start(args.n)
648 d = pbs_set.start(args.n)
672 return d
649 return d
673 config = kernel_config_manager.get_config_obj()
650 config = kernel_config_manager.get_config_obj()
674 furl_file = config['controller']['engine_furl_file']
651 furl_file = config['controller']['engine_furl_file']
675 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
652 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
676 dstart.addErrback(_err_and_stop)
653 dstart.addErrback(_err_and_stop)
677
654
678 def main_sge(args):
655 def main_sge(args):
679 cont_args = []
656 cont_args = []
680 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
657 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
681
658
682 # Check security settings before proceeding
659 # Check security settings before proceeding
683 if not check_security(args, cont_args):
660 if not check_security(args, cont_args):
684 return
661 return
685
662
686 # See if we are reusing FURL files
663 # See if we are reusing FURL files
687 if not check_reuse(args, cont_args):
664 if not check_reuse(args, cont_args):
688 return
665 return
689
666
690 if args.sgescript and not os.path.isfile(args.sgescript):
667 if args.sgescript and not os.path.isfile(args.sgescript):
691 log.err('SGE script does not exist: %s' % args.sgescript)
668 log.err('SGE script does not exist: %s' % args.sgescript)
692 return
669 return
693
670
694 cl = ControllerLauncher(extra_args=cont_args)
671 cl = ControllerLauncher(extra_args=cont_args)
695 dstart = cl.start()
672 dstart = cl.start()
696 def start_engines(r):
673 def start_engines(r):
697 sge_set = SGEEngineSet(args.sgescript)
674 sge_set = SGEEngineSet(args.sgescript)
698 def shutdown(signum, frame):
675 def shutdown(signum, frame):
699 log.msg('Stopping sge cluster')
676 log.msg('Stopping sge cluster')
700 d = sge_set.kill()
677 d = sge_set.kill()
701 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
678 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
702 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
679 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
703 signal.signal(signal.SIGINT,shutdown)
680 signal.signal(signal.SIGINT,shutdown)
704 d = sge_set.start(args.n)
681 d = sge_set.start(args.n)
705 return d
682 return d
706 config = kernel_config_manager.get_config_obj()
683 config = kernel_config_manager.get_config_obj()
707 furl_file = config['controller']['engine_furl_file']
684 furl_file = config['controller']['engine_furl_file']
708 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
685 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
709 dstart.addErrback(_err_and_stop)
686 dstart.addErrback(_err_and_stop)
710
687
711
688
712 def main_ssh(args):
689 def main_ssh(args):
713 """Start a controller on localhost and engines using ssh.
690 """Start a controller on localhost and engines using ssh.
714
691
715 Your clusterfile should look like::
692 Your clusterfile should look like::
716
693
717 send_furl = False # True, if you want
694 send_furl = False # True, if you want
718 engines = {
695 engines = {
719 'engine_host1' : engine_count,
696 'engine_host1' : engine_count,
720 'engine_host2' : engine_count2
697 'engine_host2' : engine_count2
721 }
698 }
722 """
699 """
723 clusterfile = {}
700 clusterfile = {}
724 execfile(args.clusterfile, clusterfile)
701 execfile(args.clusterfile, clusterfile)
725 if not clusterfile.has_key('send_furl'):
702 if not clusterfile.has_key('send_furl'):
726 clusterfile['send_furl'] = False
703 clusterfile['send_furl'] = False
727
704
728 cont_args = []
705 cont_args = []
729 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
706 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
730
707
731 # Check security settings before proceeding
708 # Check security settings before proceeding
732 if not check_security(args, cont_args):
709 if not check_security(args, cont_args):
733 return
710 return
734
711
735 # See if we are reusing FURL files
712 # See if we are reusing FURL files
736 if not check_reuse(args, cont_args):
713 if not check_reuse(args, cont_args):
737 return
714 return
738
715
739 cl = ControllerLauncher(extra_args=cont_args)
716 cl = ControllerLauncher(extra_args=cont_args)
740 dstart = cl.start()
717 dstart = cl.start()
741 def start_engines(cont_pid):
718 def start_engines(cont_pid):
742 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
719 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
743 def shutdown(signum, frame):
720 def shutdown(signum, frame):
744 d = ssh_set.kill()
721 d = ssh_set.kill()
745 cl.interrupt_then_kill(1.0)
722 cl.interrupt_then_kill(1.0)
746 reactor.callLater(2.0, reactor.stop)
723 reactor.callLater(2.0, reactor.stop)
747 signal.signal(signal.SIGINT,shutdown)
724 signal.signal(signal.SIGINT,shutdown)
748 d = ssh_set.start(clusterfile['send_furl'])
725 d = ssh_set.start(clusterfile['send_furl'])
749 return d
726 return d
750 config = kernel_config_manager.get_config_obj()
727 config = kernel_config_manager.get_config_obj()
751 furl_file = config['controller']['engine_furl_file']
728 furl_file = config['controller']['engine_furl_file']
752 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
729 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
753 dstart.addErrback(_err_and_stop)
730 dstart.addErrback(_err_and_stop)
754
731
755
732
756 def get_args():
733 def get_args():
757 base_parser = argparse.ArgumentParser(add_help=False)
734 base_parser = argparse.ArgumentParser(add_help=False)
758 base_parser.add_argument(
735 base_parser.add_argument(
759 '-r',
736 '-r',
760 action='store_true',
737 action='store_true',
761 dest='r',
738 dest='r',
762 help='try to reuse FURL files. Use with --client-port and --engine-port'
739 help='try to reuse FURL files. Use with --client-port and --engine-port'
763 )
740 )
764 base_parser.add_argument(
741 base_parser.add_argument(
765 '--client-port',
742 '--client-port',
766 type=int,
743 type=int,
767 dest='client_port',
744 dest='client_port',
768 help='the port the controller will listen on for client connections',
745 help='the port the controller will listen on for client connections',
769 default=0
746 default=0
770 )
747 )
771 base_parser.add_argument(
748 base_parser.add_argument(
772 '--engine-port',
749 '--engine-port',
773 type=int,
750 type=int,
774 dest='engine_port',
751 dest='engine_port',
775 help='the port the controller will listen on for engine connections',
752 help='the port the controller will listen on for engine connections',
776 default=0
753 default=0
777 )
754 )
778 base_parser.add_argument(
755 base_parser.add_argument(
779 '-x',
756 '-x',
780 action='store_true',
757 action='store_true',
781 dest='x',
758 dest='x',
782 help='turn off client security'
759 help='turn off client security'
783 )
760 )
784 base_parser.add_argument(
761 base_parser.add_argument(
785 '-y',
762 '-y',
786 action='store_true',
763 action='store_true',
787 dest='y',
764 dest='y',
788 help='turn off engine security'
765 help='turn off engine security'
789 )
766 )
790 base_parser.add_argument(
767 base_parser.add_argument(
791 "--logdir",
768 "--logdir",
792 type=str,
769 type=str,
793 dest="logdir",
770 dest="logdir",
794 help="directory to put log files (default=$IPYTHONDIR/log)",
771 help="directory to put log files (default=$IPYTHONDIR/log)",
795 default=pjoin(get_ipython_dir(),'log')
772 default=pjoin(get_ipython_dir(),'log')
796 )
773 )
797 base_parser.add_argument(
774 base_parser.add_argument(
798 "-n",
775 "-n",
799 "--num",
776 "--num",
800 type=int,
777 type=int,
801 dest="n",
778 dest="n",
802 default=2,
779 default=2,
803 help="the number of engines to start"
780 help="the number of engines to start"
804 )
781 )
805
782
806 parser = argparse.ArgumentParser(
783 parser = argparse.ArgumentParser(
807 description='IPython cluster startup. This starts a controller and\
784 description='IPython cluster startup. This starts a controller and\
808 engines using various approaches. Use the IPYTHONDIR environment\
785 engines using various approaches. Use the IPYTHONDIR environment\
809 variable to change your IPython directory from the default of\
786 variable to change your IPython directory from the default of\
810 .ipython or _ipython. The log and security subdirectories of your\
787 .ipython or _ipython. The log and security subdirectories of your\
811 IPython directory will be used by this script for log files and\
788 IPython directory will be used by this script for log files and\
812 security files.'
789 security files.'
813 )
790 )
814 subparsers = parser.add_subparsers(
791 subparsers = parser.add_subparsers(
815 help='available cluster types. For help, do "ipcluster TYPE --help"')
792 help='available cluster types. For help, do "ipcluster TYPE --help"')
816
793
817 parser_local = subparsers.add_parser(
794 parser_local = subparsers.add_parser(
818 'local',
795 'local',
819 help='run a local cluster',
796 help='run a local cluster',
820 parents=[base_parser]
797 parents=[base_parser]
821 )
798 )
822 parser_local.set_defaults(func=main_local)
799 parser_local.set_defaults(func=main_local)
823
800
824 parser_mpirun = subparsers.add_parser(
801 parser_mpirun = subparsers.add_parser(
825 'mpirun',
802 'mpirun',
826 help='run a cluster using mpirun (mpiexec also works)',
803 help='run a cluster using mpirun (mpiexec also works)',
827 parents=[base_parser]
804 parents=[base_parser]
828 )
805 )
829 parser_mpirun.add_argument(
806 parser_mpirun.add_argument(
830 "--mpi",
807 "--mpi",
831 type=str,
808 type=str,
832 dest="mpi", # Don't put a default here to allow no MPI support
809 dest="mpi", # Don't put a default here to allow no MPI support
833 help="how to call MPI_Init (default=mpi4py)"
810 help="how to call MPI_Init (default=mpi4py)"
834 )
811 )
835 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
812 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
836
813
837 parser_mpiexec = subparsers.add_parser(
814 parser_mpiexec = subparsers.add_parser(
838 'mpiexec',
815 'mpiexec',
839 help='run a cluster using mpiexec (mpirun also works)',
816 help='run a cluster using mpiexec (mpirun also works)',
840 parents=[base_parser]
817 parents=[base_parser]
841 )
818 )
842 parser_mpiexec.add_argument(
819 parser_mpiexec.add_argument(
843 "--mpi",
820 "--mpi",
844 type=str,
821 type=str,
845 dest="mpi", # Don't put a default here to allow no MPI support
822 dest="mpi", # Don't put a default here to allow no MPI support
846 help="how to call MPI_Init (default=mpi4py)"
823 help="how to call MPI_Init (default=mpi4py)"
847 )
824 )
848 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
825 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
849
826
850 parser_pbs = subparsers.add_parser(
827 parser_pbs = subparsers.add_parser(
851 'pbs',
828 'pbs',
852 help='run a pbs cluster',
829 help='run a pbs cluster',
853 parents=[base_parser]
830 parents=[base_parser]
854 )
831 )
855 parser_pbs.add_argument(
832 parser_pbs.add_argument(
856 '--pbs-script',
833 '--pbs-script',
857 type=str,
834 type=str,
858 dest='pbsscript',
835 dest='pbsscript',
859 help='PBS script template',
836 help='PBS script template',
860 default='pbs.template'
837 default=''
861 )
838 )
862 parser_pbs.set_defaults(func=main_pbs)
839 parser_pbs.set_defaults(func=main_pbs)
863
840
864 parser_sge = subparsers.add_parser(
841 parser_sge = subparsers.add_parser(
865 'sge',
842 'sge',
866 help='run an sge cluster',
843 help='run an sge cluster',
867 parents=[base_parser]
844 parents=[base_parser]
868 )
845 )
869 parser_sge.add_argument(
846 parser_sge.add_argument(
870 '--sge-script',
847 '--sge-script',
871 type=str,
848 type=str,
872 dest='sgescript',
849 dest='sgescript',
873 help='SGE script template',
850 help='SGE script template',
874 default='' # SGEEngineSet will create one if not specified
851 default='' # SGEEngineSet will create one if not specified
875 )
852 )
876 parser_sge.set_defaults(func=main_sge)
853 parser_sge.set_defaults(func=main_sge)
877
854
878 parser_ssh = subparsers.add_parser(
855 parser_ssh = subparsers.add_parser(
879 'ssh',
856 'ssh',
880 help='run a cluster using ssh, should have ssh-keys setup',
857 help='run a cluster using ssh, should have ssh-keys setup',
881 parents=[base_parser]
858 parents=[base_parser]
882 )
859 )
883 parser_ssh.add_argument(
860 parser_ssh.add_argument(
884 '--clusterfile',
861 '--clusterfile',
885 type=str,
862 type=str,
886 dest='clusterfile',
863 dest='clusterfile',
887 help='python file describing the cluster',
864 help='python file describing the cluster',
888 default='clusterfile.py',
865 default='clusterfile.py',
889 )
866 )
890 parser_ssh.add_argument(
867 parser_ssh.add_argument(
891 '--sshx',
868 '--sshx',
892 type=str,
869 type=str,
893 dest='sshx',
870 dest='sshx',
894 help='sshx launcher helper'
871 help='sshx launcher helper'
895 )
872 )
896 parser_ssh.set_defaults(func=main_ssh)
873 parser_ssh.set_defaults(func=main_ssh)
897
874
898 args = parser.parse_args()
875 args = parser.parse_args()
899 return args
876 return args
900
877
901 def main():
878 def main():
902 args = get_args()
879 args = get_args()
903 reactor.callWhenRunning(args.func, args)
880 reactor.callWhenRunning(args.func, args)
904 log.startLogging(sys.stdout)
881 log.startLogging(sys.stdout)
905 reactor.run()
882 reactor.run()
906
883
907 if __name__ == '__main__':
884 if __name__ == '__main__':
908 main()
885 main()
General Comments 0
You need to be logged in to leave comments. Login now