##// END OF EJS Templates
Removing unneeded log.msg in ipcluster.py
Brian Granger -
Show More
@@ -1,737 +1,736
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 import tempfile
22 pjoin = os.path.join
22 pjoin = os.path.join
23
23
24 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import failure, log
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.external import Itpl
31 from IPython.external import Itpl
32 from IPython.genutils import get_ipython_dir, num_cpus
32 from IPython.genutils import get_ipython_dir, num_cpus
33 from IPython.kernel.fcutil import have_crypto
33 from IPython.kernel.fcutil import have_crypto
34 from IPython.kernel.error import SecurityError
34 from IPython.kernel.error import SecurityError
35 from IPython.kernel.fcutil import have_crypto
35 from IPython.kernel.fcutil import have_crypto
36 from IPython.kernel.twistedutil import gatherBoth
36 from IPython.kernel.twistedutil import gatherBoth
37 from IPython.kernel.util import printer
37 from IPython.kernel.util import printer
38
38
39
39
40 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
41 # General process handling code
41 # General process handling code
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43
43
44 def find_exe(cmd):
44 def find_exe(cmd):
45 try:
45 try:
46 import win32api
46 import win32api
47 except ImportError:
47 except ImportError:
48 raise ImportError('you need to have pywin32 installed for this to work')
48 raise ImportError('you need to have pywin32 installed for this to work')
49 else:
49 else:
50 try:
50 try:
51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
52 except:
52 except:
53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
54 return path
54 return path
55
55
56 class ProcessStateError(Exception):
56 class ProcessStateError(Exception):
57 pass
57 pass
58
58
59 class UnknownStatus(Exception):
59 class UnknownStatus(Exception):
60 pass
60 pass
61
61
62 class LauncherProcessProtocol(ProcessProtocol):
62 class LauncherProcessProtocol(ProcessProtocol):
63 """
63 """
64 A ProcessProtocol to go with the ProcessLauncher.
64 A ProcessProtocol to go with the ProcessLauncher.
65 """
65 """
66 def __init__(self, process_launcher):
66 def __init__(self, process_launcher):
67 self.process_launcher = process_launcher
67 self.process_launcher = process_launcher
68
68
69 def connectionMade(self):
69 def connectionMade(self):
70 self.process_launcher.fire_start_deferred(self.transport.pid)
70 self.process_launcher.fire_start_deferred(self.transport.pid)
71
71
72 def processEnded(self, status):
72 def processEnded(self, status):
73 value = status.value
73 value = status.value
74 if isinstance(value, ProcessDone):
74 if isinstance(value, ProcessDone):
75 self.process_launcher.fire_stop_deferred(0)
75 self.process_launcher.fire_stop_deferred(0)
76 elif isinstance(value, ProcessTerminated):
76 elif isinstance(value, ProcessTerminated):
77 self.process_launcher.fire_stop_deferred(
77 self.process_launcher.fire_stop_deferred(
78 {'exit_code':value.exitCode,
78 {'exit_code':value.exitCode,
79 'signal':value.signal,
79 'signal':value.signal,
80 'status':value.status
80 'status':value.status
81 }
81 }
82 )
82 )
83 else:
83 else:
84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
85
85
86 def outReceived(self, data):
86 def outReceived(self, data):
87 log.msg(data)
87 log.msg(data)
88
88
89 def errReceived(self, data):
89 def errReceived(self, data):
90 log.err(data)
90 log.err(data)
91
91
92 class ProcessLauncher(object):
92 class ProcessLauncher(object):
93 """
93 """
94 Start and stop an external process in an asynchronous manner.
94 Start and stop an external process in an asynchronous manner.
95
95
96 Currently this uses deferreds to notify other parties of process state
96 Currently this uses deferreds to notify other parties of process state
97 changes. This is an awkward design and should be moved to using
97 changes. This is an awkward design and should be moved to using
98 a formal NotificationCenter.
98 a formal NotificationCenter.
99 """
99 """
100 def __init__(self, cmd_and_args):
100 def __init__(self, cmd_and_args):
101 self.cmd = cmd_and_args[0]
101 self.cmd = cmd_and_args[0]
102 self.args = cmd_and_args
102 self.args = cmd_and_args
103 self._reset()
103 self._reset()
104
104
105 def _reset(self):
105 def _reset(self):
106 self.process_protocol = None
106 self.process_protocol = None
107 self.pid = None
107 self.pid = None
108 self.start_deferred = None
108 self.start_deferred = None
109 self.stop_deferreds = []
109 self.stop_deferreds = []
110 self.state = 'before' # before, running, or after
110 self.state = 'before' # before, running, or after
111
111
112 @property
112 @property
113 def running(self):
113 def running(self):
114 if self.state == 'running':
114 if self.state == 'running':
115 return True
115 return True
116 else:
116 else:
117 return False
117 return False
118
118
119 def fire_start_deferred(self, pid):
119 def fire_start_deferred(self, pid):
120 self.pid = pid
120 self.pid = pid
121 self.state = 'running'
121 self.state = 'running'
122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
123 self.start_deferred.callback(pid)
123 self.start_deferred.callback(pid)
124
124
125 def start(self):
125 def start(self):
126 if self.state == 'before':
126 if self.state == 'before':
127 self.process_protocol = LauncherProcessProtocol(self)
127 self.process_protocol = LauncherProcessProtocol(self)
128 self.start_deferred = defer.Deferred()
128 self.start_deferred = defer.Deferred()
129 self.process_transport = reactor.spawnProcess(
129 self.process_transport = reactor.spawnProcess(
130 self.process_protocol,
130 self.process_protocol,
131 self.cmd,
131 self.cmd,
132 self.args,
132 self.args,
133 env=os.environ
133 env=os.environ
134 )
134 )
135 return self.start_deferred
135 return self.start_deferred
136 else:
136 else:
137 s = 'the process has already been started and has state: %r' % \
137 s = 'the process has already been started and has state: %r' % \
138 self.state
138 self.state
139 return defer.fail(ProcessStateError(s))
139 return defer.fail(ProcessStateError(s))
140
140
141 def get_stop_deferred(self):
141 def get_stop_deferred(self):
142 if self.state == 'running' or self.state == 'before':
142 if self.state == 'running' or self.state == 'before':
143 d = defer.Deferred()
143 d = defer.Deferred()
144 self.stop_deferreds.append(d)
144 self.stop_deferreds.append(d)
145 return d
145 return d
146 else:
146 else:
147 s = 'this process is already complete'
147 s = 'this process is already complete'
148 return defer.fail(ProcessStateError(s))
148 return defer.fail(ProcessStateError(s))
149
149
150 def fire_stop_deferred(self, exit_code):
150 def fire_stop_deferred(self, exit_code):
151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
152 self.state = 'after'
152 self.state = 'after'
153 for d in self.stop_deferreds:
153 for d in self.stop_deferreds:
154 d.callback(exit_code)
154 d.callback(exit_code)
155
155
156 def signal(self, sig):
156 def signal(self, sig):
157 """
157 """
158 Send a signal to the process.
158 Send a signal to the process.
159
159
160 The argument sig can be ('KILL','INT', etc.) or any signal number.
160 The argument sig can be ('KILL','INT', etc.) or any signal number.
161 """
161 """
162 if self.state == 'running':
162 if self.state == 'running':
163 self.process_transport.signalProcess(sig)
163 self.process_transport.signalProcess(sig)
164
164
165 # def __del__(self):
165 # def __del__(self):
166 # self.signal('KILL')
166 # self.signal('KILL')
167
167
168 def interrupt_then_kill(self, delay=1.0):
168 def interrupt_then_kill(self, delay=1.0):
169 self.signal('INT')
169 self.signal('INT')
170 reactor.callLater(delay, self.signal, 'KILL')
170 reactor.callLater(delay, self.signal, 'KILL')
171
171
172
172
173 #-----------------------------------------------------------------------------
173 #-----------------------------------------------------------------------------
174 # Code for launching controller and engines
174 # Code for launching controller and engines
175 #-----------------------------------------------------------------------------
175 #-----------------------------------------------------------------------------
176
176
177
177
178 class ControllerLauncher(ProcessLauncher):
178 class ControllerLauncher(ProcessLauncher):
179
179
180 def __init__(self, extra_args=None):
180 def __init__(self, extra_args=None):
181 if sys.platform == 'win32':
181 if sys.platform == 'win32':
182 # This logic is needed because the ipcontroller script doesn't
182 # This logic is needed because the ipcontroller script doesn't
183 # always get installed in the same way or in the same location.
183 # always get installed in the same way or in the same location.
184 from IPython.kernel.scripts import ipcontroller
184 from IPython.kernel.scripts import ipcontroller
185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
186 # The -u option here turns on unbuffered output, which is required
186 # The -u option here turns on unbuffered output, which is required
187 # on Win32 to prevent wierd conflict and problems with Twisted
187 # on Win32 to prevent wierd conflict and problems with Twisted
188 args = [find_exe('python'), '-u', script_location]
188 args = [find_exe('python'), '-u', script_location]
189 else:
189 else:
190 args = ['ipcontroller']
190 args = ['ipcontroller']
191 self.extra_args = extra_args
191 self.extra_args = extra_args
192 if extra_args is not None:
192 if extra_args is not None:
193 args.extend(extra_args)
193 args.extend(extra_args)
194
194
195 ProcessLauncher.__init__(self, args)
195 ProcessLauncher.__init__(self, args)
196
196
197
197
198 class EngineLauncher(ProcessLauncher):
198 class EngineLauncher(ProcessLauncher):
199
199
200 def __init__(self, extra_args=None):
200 def __init__(self, extra_args=None):
201 if sys.platform == 'win32':
201 if sys.platform == 'win32':
202 # This logic is needed because the ipcontroller script doesn't
202 # This logic is needed because the ipcontroller script doesn't
203 # always get installed in the same way or in the same location.
203 # always get installed in the same way or in the same location.
204 from IPython.kernel.scripts import ipengine
204 from IPython.kernel.scripts import ipengine
205 script_location = ipengine.__file__.replace('.pyc', '.py')
205 script_location = ipengine.__file__.replace('.pyc', '.py')
206 # The -u option here turns on unbuffered output, which is required
206 # The -u option here turns on unbuffered output, which is required
207 # on Win32 to prevent wierd conflict and problems with Twisted
207 # on Win32 to prevent wierd conflict and problems with Twisted
208 args = [find_exe('python'), '-u', script_location]
208 args = [find_exe('python'), '-u', script_location]
209 else:
209 else:
210 args = ['ipengine']
210 args = ['ipengine']
211 self.extra_args = extra_args
211 self.extra_args = extra_args
212 if extra_args is not None:
212 if extra_args is not None:
213 args.extend(extra_args)
213 args.extend(extra_args)
214
214
215 ProcessLauncher.__init__(self, args)
215 ProcessLauncher.__init__(self, args)
216
216
217
217
218 class LocalEngineSet(object):
218 class LocalEngineSet(object):
219
219
220 def __init__(self, extra_args=None):
220 def __init__(self, extra_args=None):
221 self.extra_args = extra_args
221 self.extra_args = extra_args
222 self.launchers = []
222 self.launchers = []
223
223
224 def start(self, n):
224 def start(self, n):
225 dlist = []
225 dlist = []
226 for i in range(n):
226 for i in range(n):
227 el = EngineLauncher(extra_args=self.extra_args)
227 el = EngineLauncher(extra_args=self.extra_args)
228 d = el.start()
228 d = el.start()
229 self.launchers.append(el)
229 self.launchers.append(el)
230 dlist.append(d)
230 dlist.append(d)
231 dfinal = gatherBoth(dlist, consumeErrors=True)
231 dfinal = gatherBoth(dlist, consumeErrors=True)
232 dfinal.addCallback(self._handle_start)
232 dfinal.addCallback(self._handle_start)
233 return dfinal
233 return dfinal
234
234
235 def _handle_start(self, r):
235 def _handle_start(self, r):
236 log.msg('Engines started with pids: %r' % r)
236 log.msg('Engines started with pids: %r' % r)
237 return r
237 return r
238
238
239 def _handle_stop(self, r):
239 def _handle_stop(self, r):
240 log.msg('Engines received signal: %r' % r)
240 log.msg('Engines received signal: %r' % r)
241 return r
241 return r
242
242
243 def signal(self, sig):
243 def signal(self, sig):
244 dlist = []
244 dlist = []
245 for el in self.launchers:
245 for el in self.launchers:
246 d = el.get_stop_deferred()
246 d = el.get_stop_deferred()
247 dlist.append(d)
247 dlist.append(d)
248 el.signal(sig)
248 el.signal(sig)
249 dfinal = gatherBoth(dlist, consumeErrors=True)
249 dfinal = gatherBoth(dlist, consumeErrors=True)
250 dfinal.addCallback(self._handle_stop)
250 dfinal.addCallback(self._handle_stop)
251 return dfinal
251 return dfinal
252
252
253 def interrupt_then_kill(self, delay=1.0):
253 def interrupt_then_kill(self, delay=1.0):
254 dlist = []
254 dlist = []
255 for el in self.launchers:
255 for el in self.launchers:
256 d = el.get_stop_deferred()
256 d = el.get_stop_deferred()
257 dlist.append(d)
257 dlist.append(d)
258 el.interrupt_then_kill(delay)
258 el.interrupt_then_kill(delay)
259 dfinal = gatherBoth(dlist, consumeErrors=True)
259 dfinal = gatherBoth(dlist, consumeErrors=True)
260 dfinal.addCallback(self._handle_stop)
260 dfinal.addCallback(self._handle_stop)
261 return dfinal
261 return dfinal
262
262
263
263
264 class BatchEngineSet(object):
264 class BatchEngineSet(object):
265
265
266 # Subclasses must fill these in. See PBSEngineSet
266 # Subclasses must fill these in. See PBSEngineSet
267 submit_command = ''
267 submit_command = ''
268 delete_command = ''
268 delete_command = ''
269 job_id_regexp = ''
269 job_id_regexp = ''
270
270
271 def __init__(self, template_file, **kwargs):
271 def __init__(self, template_file, **kwargs):
272 self.template_file = template_file
272 self.template_file = template_file
273 self.context = {}
273 self.context = {}
274 self.context.update(kwargs)
274 self.context.update(kwargs)
275 self.batch_file = self.template_file+'-run'
275 self.batch_file = self.template_file+'-run'
276
276
277 def parse_job_id(self, output):
277 def parse_job_id(self, output):
278 m = re.match(self.job_id_regexp, output)
278 m = re.match(self.job_id_regexp, output)
279 if m is not None:
279 if m is not None:
280 job_id = m.group()
280 job_id = m.group()
281 else:
281 else:
282 raise Exception("job id couldn't be determined: %s" % output)
282 raise Exception("job id couldn't be determined: %s" % output)
283 self.job_id = job_id
283 self.job_id = job_id
284 log.msg('Job started with job id: %r' % job_id)
284 log.msg('Job started with job id: %r' % job_id)
285 return job_id
285 return job_id
286
286
287 def write_batch_script(self, n):
287 def write_batch_script(self, n):
288 self.context['n'] = n
288 self.context['n'] = n
289 template = open(self.template_file, 'r').read()
289 template = open(self.template_file, 'r').read()
290 log.msg('Using template for batch script: %s' % self.template_file)
290 log.msg('Using template for batch script: %s' % self.template_file)
291 script_as_string = Itpl.itplns(template, self.context)
291 script_as_string = Itpl.itplns(template, self.context)
292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
293 f = open(self.batch_file,'w')
293 f = open(self.batch_file,'w')
294 f.write(script_as_string)
294 f.write(script_as_string)
295 f.close()
295 f.close()
296
296
297 def handle_error(self, f):
297 def handle_error(self, f):
298 f.printTraceback()
298 f.printTraceback()
299 f.raiseException()
299 f.raiseException()
300
300
301 def start(self, n):
301 def start(self, n):
302 self.write_batch_script(n)
302 self.write_batch_script(n)
303 d = getProcessOutput(self.submit_command,
303 d = getProcessOutput(self.submit_command,
304 [self.batch_file],env=os.environ)
304 [self.batch_file],env=os.environ)
305 d.addCallback(self.parse_job_id)
305 d.addCallback(self.parse_job_id)
306 d.addErrback(self.handle_error)
306 d.addErrback(self.handle_error)
307 return d
307 return d
308
308
309 def kill(self):
309 def kill(self):
310 d = getProcessOutput(self.delete_command,
310 d = getProcessOutput(self.delete_command,
311 [self.job_id],env=os.environ)
311 [self.job_id],env=os.environ)
312 return d
312 return d
313
313
314 class PBSEngineSet(BatchEngineSet):
314 class PBSEngineSet(BatchEngineSet):
315
315
316 submit_command = 'qsub'
316 submit_command = 'qsub'
317 delete_command = 'qdel'
317 delete_command = 'qdel'
318 job_id_regexp = '\d+'
318 job_id_regexp = '\d+'
319
319
320 def __init__(self, template_file, **kwargs):
320 def __init__(self, template_file, **kwargs):
321 BatchEngineSet.__init__(self, template_file, **kwargs)
321 BatchEngineSet.__init__(self, template_file, **kwargs)
322
322
323
323
324 sshx_template="""#!/bin/sh
324 sshx_template="""#!/bin/sh
325 "$@" &> /dev/null &
325 "$@" &> /dev/null &
326 echo $!
326 echo $!
327 """
327 """
328
328
329 engine_killer_template="""#!/bin/sh
329 engine_killer_template="""#!/bin/sh
330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 """
331 """
332
332
333 class SSHEngineSet(object):
333 class SSHEngineSet(object):
334 sshx_template=sshx_template
334 sshx_template=sshx_template
335 engine_killer_template=engine_killer_template
335 engine_killer_template=engine_killer_template
336
336
337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 """Start a controller on localhost and engines using ssh.
338 """Start a controller on localhost and engines using ssh.
339
339
340 The engine_hosts argument is a dict with hostnames as keys and
340 The engine_hosts argument is a dict with hostnames as keys and
341 the number of engine (int) as values. sshx is the name of a local
341 the number of engine (int) as values. sshx is the name of a local
342 file that will be used to run remote commands. This file is used
342 file that will be used to run remote commands. This file is used
343 to setup the environment properly.
343 to setup the environment properly.
344 """
344 """
345
345
346 self.temp_dir = tempfile.gettempdir()
346 self.temp_dir = tempfile.gettempdir()
347 if sshx is not None:
347 if sshx is not None:
348 self.sshx = sshx
348 self.sshx = sshx
349 else:
349 else:
350 # Write the sshx.sh file locally from our template.
350 # Write the sshx.sh file locally from our template.
351 self.sshx = os.path.join(
351 self.sshx = os.path.join(
352 self.temp_dir,
352 self.temp_dir,
353 '%s-main-sshx.sh' % os.environ['USER']
353 '%s-main-sshx.sh' % os.environ['USER']
354 )
354 )
355 f = open(self.sshx, 'w')
355 f = open(self.sshx, 'w')
356 f.writelines(self.sshx_template)
356 f.writelines(self.sshx_template)
357 f.close()
357 f.close()
358 self.engine_command = ipengine
358 self.engine_command = ipengine
359 self.engine_hosts = engine_hosts
359 self.engine_hosts = engine_hosts
360 # Write the engine killer script file locally from our template.
360 # Write the engine killer script file locally from our template.
361 self.engine_killer = os.path.join(
361 self.engine_killer = os.path.join(
362 self.temp_dir,
362 self.temp_dir,
363 '%s-local-engine_killer.sh' % os.environ['USER']
363 '%s-local-engine_killer.sh' % os.environ['USER']
364 )
364 )
365 f = open(self.engine_killer, 'w')
365 f = open(self.engine_killer, 'w')
366 f.writelines(self.engine_killer_template)
366 f.writelines(self.engine_killer_template)
367 f.close()
367 f.close()
368
368
369 def start(self, send_furl=False):
369 def start(self, send_furl=False):
370 dlist = []
370 dlist = []
371 for host in self.engine_hosts.keys():
371 for host in self.engine_hosts.keys():
372 count = self.engine_hosts[host]
372 count = self.engine_hosts[host]
373 d = self._start(host, count, send_furl)
373 d = self._start(host, count, send_furl)
374 dlist.append(d)
374 dlist.append(d)
375 return gatherBoth(dlist, consumeErrors=True)
375 return gatherBoth(dlist, consumeErrors=True)
376
376
377 def _start(self, hostname, count=1, send_furl=False):
377 def _start(self, hostname, count=1, send_furl=False):
378 if send_furl:
378 if send_furl:
379 d = self._scp_furl(hostname)
379 d = self._scp_furl(hostname)
380 else:
380 else:
381 d = defer.succeed(None)
381 d = defer.succeed(None)
382 d.addCallback(lambda r: self._scp_sshx(hostname))
382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 return d
384 return d
385
385
386 def _scp_furl(self, hostname):
386 def _scp_furl(self, hostname):
387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
388 cmd_list = scp_cmd.split()
388 cmd_list = scp_cmd.split()
389 cmd_list[1] = os.path.expanduser(cmd_list[1])
389 cmd_list[1] = os.path.expanduser(cmd_list[1])
390 log.msg('Copying furl file: %s' % scp_cmd)
390 log.msg('Copying furl file: %s' % scp_cmd)
391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
392 return d
392 return d
393
393
394 def _scp_sshx(self, hostname):
394 def _scp_sshx(self, hostname):
395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 self.sshx, hostname,
396 self.sshx, hostname,
397 self.temp_dir, os.environ['USER']
397 self.temp_dir, os.environ['USER']
398 )
398 )
399 print
399 print
400 log.msg("Copying sshx: %s" % scp_cmd)
400 log.msg("Copying sshx: %s" % scp_cmd)
401 sshx_scp = scp_cmd.split()
401 sshx_scp = scp_cmd.split()
402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
403 return d
403 return d
404
404
405 def _ssh_engine(self, hostname, count):
405 def _ssh_engine(self, hostname, count):
406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 hostname, self.temp_dir,
407 hostname, self.temp_dir,
408 os.environ['USER'], self.engine_command
408 os.environ['USER'], self.engine_command
409 )
409 )
410 cmds = exec_engine.split()
410 cmds = exec_engine.split()
411 dlist = []
411 dlist = []
412 log.msg("about to start engines...")
412 log.msg("about to start engines...")
413 for i in range(count):
413 for i in range(count):
414 log.msg('Starting engines: %s' % exec_engine)
414 log.msg('Starting engines: %s' % exec_engine)
415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 dlist.append(d)
416 dlist.append(d)
417 return gatherBoth(dlist, consumeErrors=True)
417 return gatherBoth(dlist, consumeErrors=True)
418
418
419 def kill(self):
419 def kill(self):
420 dlist = []
420 dlist = []
421 for host in self.engine_hosts.keys():
421 for host in self.engine_hosts.keys():
422 d = self._killall(host)
422 d = self._killall(host)
423 dlist.append(d)
423 dlist.append(d)
424 return gatherBoth(dlist, consumeErrors=True)
424 return gatherBoth(dlist, consumeErrors=True)
425
425
426 def _killall(self, hostname):
426 def _killall(self, hostname):
427 d = self._scp_engine_killer(hostname)
427 d = self._scp_engine_killer(hostname)
428 d.addCallback(lambda r: self._ssh_kill(hostname))
428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 # d.addErrback(self._exec_err)
429 # d.addErrback(self._exec_err)
430 return d
430 return d
431
431
432 def _scp_engine_killer(self, hostname):
432 def _scp_engine_killer(self, hostname):
433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 self.engine_killer,
434 self.engine_killer,
435 hostname,
435 hostname,
436 self.temp_dir,
436 self.temp_dir,
437 os.environ['USER']
437 os.environ['USER']
438 )
438 )
439 cmds = scp_cmd.split()
439 cmds = scp_cmd.split()
440 log.msg('Copying engine_killer: %s' % scp_cmd)
440 log.msg('Copying engine_killer: %s' % scp_cmd)
441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
442 return d
442 return d
443
443
444 def _ssh_kill(self, hostname):
444 def _ssh_kill(self, hostname):
445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 hostname,
446 hostname,
447 self.temp_dir,
447 self.temp_dir,
448 os.environ['USER']
448 os.environ['USER']
449 )
449 )
450 log.msg('Killing engine: %s' % kill_cmd)
450 log.msg('Killing engine: %s' % kill_cmd)
451 kill_cmd = kill_cmd.split()
451 kill_cmd = kill_cmd.split()
452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 return d
453 return d
454
454
455 def _exec_err(self, r):
455 def _exec_err(self, r):
456 log.msg(r)
456 log.msg(r)
457
457
458 #-----------------------------------------------------------------------------
458 #-----------------------------------------------------------------------------
459 # Main functions for the different types of clusters
459 # Main functions for the different types of clusters
460 #-----------------------------------------------------------------------------
460 #-----------------------------------------------------------------------------
461
461
462 # TODO:
462 # TODO:
463 # The logic in these codes should be moved into classes like LocalCluster
463 # The logic in these codes should be moved into classes like LocalCluster
464 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
464 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
465 # The main functions should then just parse the command line arguments, create
465 # The main functions should then just parse the command line arguments, create
466 # the appropriate class and call a 'start' method.
466 # the appropriate class and call a 'start' method.
467
467
468 def check_security(args, cont_args):
468 def check_security(args, cont_args):
469 if (not args.x or not args.y) and not have_crypto:
469 if (not args.x or not args.y) and not have_crypto:
470 log.err("""
470 log.err("""
471 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
471 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
472 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
472 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
473 reactor.stop()
473 reactor.stop()
474 return False
474 return False
475 if args.x:
475 if args.x:
476 cont_args.append('-x')
476 cont_args.append('-x')
477 if args.y:
477 if args.y:
478 cont_args.append('-y')
478 cont_args.append('-y')
479 return True
479 return True
480
480
481
481
482 def main_local(args):
482 def main_local(args):
483 cont_args = []
483 cont_args = []
484 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
484 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
485
485
486 # Check security settings before proceeding
486 # Check security settings before proceeding
487 if not check_security(args, cont_args):
487 if not check_security(args, cont_args):
488 return
488 return
489
489
490 cl = ControllerLauncher(extra_args=cont_args)
490 cl = ControllerLauncher(extra_args=cont_args)
491 dstart = cl.start()
491 dstart = cl.start()
492 def start_engines(cont_pid):
492 def start_engines(cont_pid):
493 engine_args = []
493 engine_args = []
494 engine_args.append('--logfile=%s' % \
494 engine_args.append('--logfile=%s' % \
495 pjoin(args.logdir,'ipengine%s-' % cont_pid))
495 pjoin(args.logdir,'ipengine%s-' % cont_pid))
496 eset = LocalEngineSet(extra_args=engine_args)
496 eset = LocalEngineSet(extra_args=engine_args)
497 def shutdown(signum, frame):
497 def shutdown(signum, frame):
498 log.msg('Stopping local cluster')
498 log.msg('Stopping local cluster')
499 # We are still playing with the times here, but these seem
499 # We are still playing with the times here, but these seem
500 # to be reliable in allowing everything to exit cleanly.
500 # to be reliable in allowing everything to exit cleanly.
501 eset.interrupt_then_kill(0.5)
501 eset.interrupt_then_kill(0.5)
502 cl.interrupt_then_kill(0.5)
502 cl.interrupt_then_kill(0.5)
503 reactor.callLater(1.0, reactor.stop)
503 reactor.callLater(1.0, reactor.stop)
504 signal.signal(signal.SIGINT,shutdown)
504 signal.signal(signal.SIGINT,shutdown)
505 d = eset.start(args.n)
505 d = eset.start(args.n)
506 return d
506 return d
507 def delay_start(cont_pid):
507 def delay_start(cont_pid):
508 # This is needed because the controller doesn't start listening
508 # This is needed because the controller doesn't start listening
509 # right when it starts and the controller needs to write
509 # right when it starts and the controller needs to write
510 # furl files for the engine to pick up
510 # furl files for the engine to pick up
511 reactor.callLater(1.0, start_engines, cont_pid)
511 reactor.callLater(1.0, start_engines, cont_pid)
512 dstart.addCallback(delay_start)
512 dstart.addCallback(delay_start)
513 dstart.addErrback(lambda f: f.raiseException())
513 dstart.addErrback(lambda f: f.raiseException())
514
514
515
515
516 def main_mpi(args):
516 def main_mpi(args):
517 print vars(args)
517 print vars(args)
518 cont_args = []
518 cont_args = []
519 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
519 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
520
520
521 # Check security settings before proceeding
521 # Check security settings before proceeding
522 if not check_security(args, cont_args):
522 if not check_security(args, cont_args):
523 return
523 return
524
524
525 cl = ControllerLauncher(extra_args=cont_args)
525 cl = ControllerLauncher(extra_args=cont_args)
526 dstart = cl.start()
526 dstart = cl.start()
527 def start_engines(cont_pid):
527 def start_engines(cont_pid):
528 raw_args = [args.cmd]
528 raw_args = [args.cmd]
529 raw_args.extend(['-n',str(args.n)])
529 raw_args.extend(['-n',str(args.n)])
530 raw_args.append('ipengine')
530 raw_args.append('ipengine')
531 raw_args.append('-l')
531 raw_args.append('-l')
532 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
532 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
533 if args.mpi:
533 if args.mpi:
534 raw_args.append('--mpi=%s' % args.mpi)
534 raw_args.append('--mpi=%s' % args.mpi)
535 eset = ProcessLauncher(raw_args)
535 eset = ProcessLauncher(raw_args)
536 def shutdown(signum, frame):
536 def shutdown(signum, frame):
537 log.msg('Stopping local cluster')
537 log.msg('Stopping local cluster')
538 # We are still playing with the times here, but these seem
538 # We are still playing with the times here, but these seem
539 # to be reliable in allowing everything to exit cleanly.
539 # to be reliable in allowing everything to exit cleanly.
540 eset.interrupt_then_kill(1.0)
540 eset.interrupt_then_kill(1.0)
541 cl.interrupt_then_kill(1.0)
541 cl.interrupt_then_kill(1.0)
542 reactor.callLater(2.0, reactor.stop)
542 reactor.callLater(2.0, reactor.stop)
543 signal.signal(signal.SIGINT,shutdown)
543 signal.signal(signal.SIGINT,shutdown)
544 d = eset.start()
544 d = eset.start()
545 return d
545 return d
546 def delay_start(cont_pid):
546 def delay_start(cont_pid):
547 # This is needed because the controller doesn't start listening
547 # This is needed because the controller doesn't start listening
548 # right when it starts and the controller needs to write
548 # right when it starts and the controller needs to write
549 # furl files for the engine to pick up
549 # furl files for the engine to pick up
550 reactor.callLater(1.0, start_engines, cont_pid)
550 reactor.callLater(1.0, start_engines, cont_pid)
551 dstart.addCallback(delay_start)
551 dstart.addCallback(delay_start)
552 dstart.addErrback(lambda f: f.raiseException())
552 dstart.addErrback(lambda f: f.raiseException())
553
553
554
554
555 def main_pbs(args):
555 def main_pbs(args):
556 cont_args = []
556 cont_args = []
557 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
557 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
558
558
559 # Check security settings before proceeding
559 # Check security settings before proceeding
560 if not check_security(args, cont_args):
560 if not check_security(args, cont_args):
561 return
561 return
562
562
563 cl = ControllerLauncher(extra_args=cont_args)
563 cl = ControllerLauncher(extra_args=cont_args)
564 dstart = cl.start()
564 dstart = cl.start()
565 def start_engines(r):
565 def start_engines(r):
566 pbs_set = PBSEngineSet(args.pbsscript)
566 pbs_set = PBSEngineSet(args.pbsscript)
567 def shutdown(signum, frame):
567 def shutdown(signum, frame):
568 log.msg('Stopping pbs cluster')
568 log.msg('Stopping pbs cluster')
569 d = pbs_set.kill()
569 d = pbs_set.kill()
570 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
570 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
571 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
571 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
572 signal.signal(signal.SIGINT,shutdown)
572 signal.signal(signal.SIGINT,shutdown)
573 d = pbs_set.start(args.n)
573 d = pbs_set.start(args.n)
574 return d
574 return d
575 dstart.addCallback(start_engines)
575 dstart.addCallback(start_engines)
576 dstart.addErrback(lambda f: f.raiseException())
576 dstart.addErrback(lambda f: f.raiseException())
577
577
578
578
579 def main_ssh(args):
579 def main_ssh(args):
580 """Start a controller on localhost and engines using ssh.
580 """Start a controller on localhost and engines using ssh.
581
581
582 Your clusterfile should look like::
582 Your clusterfile should look like::
583
583
584 send_furl = False # True, if you want
584 send_furl = False # True, if you want
585 engines = {
585 engines = {
586 'engine_host1' : engine_count,
586 'engine_host1' : engine_count,
587 'engine_host2' : engine_count2
587 'engine_host2' : engine_count2
588 }
588 }
589 """
589 """
590 clusterfile = {}
590 clusterfile = {}
591 execfile(args.clusterfile, clusterfile)
591 execfile(args.clusterfile, clusterfile)
592 if not clusterfile.has_key('send_furl'):
592 if not clusterfile.has_key('send_furl'):
593 clusterfile['send_furl'] = False
593 clusterfile['send_furl'] = False
594
594
595 cont_args = []
595 cont_args = []
596 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
596 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
597
597
598 # Check security settings before proceeding
598 # Check security settings before proceeding
599 if not check_security(args, cont_args):
599 if not check_security(args, cont_args):
600 return
600 return
601
601
602 cl = ControllerLauncher(extra_args=cont_args)
602 cl = ControllerLauncher(extra_args=cont_args)
603 dstart = cl.start()
603 dstart = cl.start()
604 def start_engines(cont_pid):
604 def start_engines(cont_pid):
605 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
605 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
606 def shutdown(signum, frame):
606 def shutdown(signum, frame):
607 d = ssh_set.kill()
607 d = ssh_set.kill()
608 # d.addErrback(log.err)
609 cl.interrupt_then_kill(1.0)
608 cl.interrupt_then_kill(1.0)
610 reactor.callLater(2.0, reactor.stop)
609 reactor.callLater(2.0, reactor.stop)
611 signal.signal(signal.SIGINT,shutdown)
610 signal.signal(signal.SIGINT,shutdown)
612 d = ssh_set.start(clusterfile['send_furl'])
611 d = ssh_set.start(clusterfile['send_furl'])
613 return d
612 return d
614
613
615 def delay_start(cont_pid):
614 def delay_start(cont_pid):
616 reactor.callLater(1.0, start_engines, cont_pid)
615 reactor.callLater(1.0, start_engines, cont_pid)
617
616
618 dstart.addCallback(delay_start)
617 dstart.addCallback(delay_start)
619 dstart.addErrback(lambda f: f.raiseException())
618 dstart.addErrback(lambda f: f.raiseException())
620
619
621
620
622 def get_args():
621 def get_args():
623 base_parser = argparse.ArgumentParser(add_help=False)
622 base_parser = argparse.ArgumentParser(add_help=False)
624 base_parser.add_argument(
623 base_parser.add_argument(
625 '-x',
624 '-x',
626 action='store_true',
625 action='store_true',
627 dest='x',
626 dest='x',
628 help='turn off client security'
627 help='turn off client security'
629 )
628 )
630 base_parser.add_argument(
629 base_parser.add_argument(
631 '-y',
630 '-y',
632 action='store_true',
631 action='store_true',
633 dest='y',
632 dest='y',
634 help='turn off engine security'
633 help='turn off engine security'
635 )
634 )
636 base_parser.add_argument(
635 base_parser.add_argument(
637 "--logdir",
636 "--logdir",
638 type=str,
637 type=str,
639 dest="logdir",
638 dest="logdir",
640 help="directory to put log files (default=$IPYTHONDIR/log)",
639 help="directory to put log files (default=$IPYTHONDIR/log)",
641 default=pjoin(get_ipython_dir(),'log')
640 default=pjoin(get_ipython_dir(),'log')
642 )
641 )
643 base_parser.add_argument(
642 base_parser.add_argument(
644 "-n",
643 "-n",
645 "--num",
644 "--num",
646 type=int,
645 type=int,
647 dest="n",
646 dest="n",
648 default=2,
647 default=2,
649 help="the number of engines to start"
648 help="the number of engines to start"
650 )
649 )
651
650
652 parser = argparse.ArgumentParser(
651 parser = argparse.ArgumentParser(
653 description='IPython cluster startup. This starts a controller and\
652 description='IPython cluster startup. This starts a controller and\
654 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
653 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
655 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
654 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
656 )
655 )
657 subparsers = parser.add_subparsers(
656 subparsers = parser.add_subparsers(
658 help='available cluster types. For help, do "ipcluster TYPE --help"')
657 help='available cluster types. For help, do "ipcluster TYPE --help"')
659
658
660 parser_local = subparsers.add_parser(
659 parser_local = subparsers.add_parser(
661 'local',
660 'local',
662 help='run a local cluster',
661 help='run a local cluster',
663 parents=[base_parser]
662 parents=[base_parser]
664 )
663 )
665 parser_local.set_defaults(func=main_local)
664 parser_local.set_defaults(func=main_local)
666
665
667 parser_mpirun = subparsers.add_parser(
666 parser_mpirun = subparsers.add_parser(
668 'mpirun',
667 'mpirun',
669 help='run a cluster using mpirun (mpiexec also works)',
668 help='run a cluster using mpirun (mpiexec also works)',
670 parents=[base_parser]
669 parents=[base_parser]
671 )
670 )
672 parser_mpirun.add_argument(
671 parser_mpirun.add_argument(
673 "--mpi",
672 "--mpi",
674 type=str,
673 type=str,
675 dest="mpi", # Don't put a default here to allow no MPI support
674 dest="mpi", # Don't put a default here to allow no MPI support
676 help="how to call MPI_Init (default=mpi4py)"
675 help="how to call MPI_Init (default=mpi4py)"
677 )
676 )
678 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
677 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
679
678
680 parser_mpiexec = subparsers.add_parser(
679 parser_mpiexec = subparsers.add_parser(
681 'mpiexec',
680 'mpiexec',
682 help='run a cluster using mpiexec (mpirun also works)',
681 help='run a cluster using mpiexec (mpirun also works)',
683 parents=[base_parser]
682 parents=[base_parser]
684 )
683 )
685 parser_mpiexec.add_argument(
684 parser_mpiexec.add_argument(
686 "--mpi",
685 "--mpi",
687 type=str,
686 type=str,
688 dest="mpi", # Don't put a default here to allow no MPI support
687 dest="mpi", # Don't put a default here to allow no MPI support
689 help="how to call MPI_Init (default=mpi4py)"
688 help="how to call MPI_Init (default=mpi4py)"
690 )
689 )
691 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
690 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
692
691
693 parser_pbs = subparsers.add_parser(
692 parser_pbs = subparsers.add_parser(
694 'pbs',
693 'pbs',
695 help='run a pbs cluster',
694 help='run a pbs cluster',
696 parents=[base_parser]
695 parents=[base_parser]
697 )
696 )
698 parser_pbs.add_argument(
697 parser_pbs.add_argument(
699 '--pbs-script',
698 '--pbs-script',
700 type=str,
699 type=str,
701 dest='pbsscript',
700 dest='pbsscript',
702 help='PBS script template',
701 help='PBS script template',
703 default='pbs.template'
702 default='pbs.template'
704 )
703 )
705 parser_pbs.set_defaults(func=main_pbs)
704 parser_pbs.set_defaults(func=main_pbs)
706
705
707 parser_ssh = subparsers.add_parser(
706 parser_ssh = subparsers.add_parser(
708 'ssh',
707 'ssh',
709 help='run a cluster using ssh, should have ssh-keys setup',
708 help='run a cluster using ssh, should have ssh-keys setup',
710 parents=[base_parser]
709 parents=[base_parser]
711 )
710 )
712 parser_ssh.add_argument(
711 parser_ssh.add_argument(
713 '--clusterfile',
712 '--clusterfile',
714 type=str,
713 type=str,
715 dest='clusterfile',
714 dest='clusterfile',
716 help='python file describing the cluster',
715 help='python file describing the cluster',
717 default='clusterfile.py',
716 default='clusterfile.py',
718 )
717 )
719 parser_ssh.add_argument(
718 parser_ssh.add_argument(
720 '--sshx',
719 '--sshx',
721 type=str,
720 type=str,
722 dest='sshx',
721 dest='sshx',
723 help='sshx launcher helper'
722 help='sshx launcher helper'
724 )
723 )
725 parser_ssh.set_defaults(func=main_ssh)
724 parser_ssh.set_defaults(func=main_ssh)
726
725
727 args = parser.parse_args()
726 args = parser.parse_args()
728 return args
727 return args
729
728
730 def main():
729 def main():
731 args = get_args()
730 args = get_args()
732 reactor.callWhenRunning(args.func, args)
731 reactor.callWhenRunning(args.func, args)
733 log.startLogging(sys.stdout)
732 log.startLogging(sys.stdout)
734 reactor.run()
733 reactor.run()
735
734
736 if __name__ == '__main__':
735 if __name__ == '__main__':
737 main()
736 main()
General Comments 0
You need to be logged in to leave comments. Login now