##// END OF EJS Templates
Initial version of working refactored ipcluster....
Brian Granger -
Show More
This diff has been collapsed as it changes many lines, (676 lines changed) Show them Hide them
@@ -1,347 +1,381 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 """Start an IPython cluster conveniently, either locally or remotely.
5
6 Basic usage
7 -----------
8
9 For local operation, the simplest mode of usage is:
10
11 %prog -n N
12
13 where N is the number of engines you want started.
14
15 For remote operation, you must call it with a cluster description file:
16
17 %prog -f clusterfile.py
18
19 The cluster file is a normal Python script which gets run via execfile(). You
20 can have arbitrary logic in it, but all that matters is that at the end of the
21 execution, it declares the variables 'controller', 'engines', and optionally
22 'sshx'. See the accompanying examples for details on what these variables must
23 contain.
24
25
26 Notes
27 -----
28
29 WARNING: this code is still UNFINISHED and EXPERIMENTAL! It is incomplete,
30 some listed options are not really implemented, and all of its interfaces are
31 subject to change.
32
33 When operating over SSH for a remote cluster, this program relies on the
34 existence of a particular script called 'sshx'. This script must live in the
35 target systems where you'll be running your controller and engines, and is
36 needed to configure your PATH and PYTHONPATH variables for further execution of
37 python code at the other end of an SSH connection. The script can be as simple
38 as:
39
40 #!/bin/sh
41 . $HOME/.bashrc
42 "$@"
43
44 which is the default one provided by IPython. You can modify this or provide
45 your own. Since it's quite likely that for different clusters you may need
46 this script to configure things differently or that it may live in different
47 locations, its full path can be set in the same file where you define the
48 cluster setup. IPython's order of evaluation for this variable is the
49 following:
50
51 a) Internal default: 'sshx'. This only works if it is in the default system
52 path which SSH sets up in non-interactive mode.
53
54 b) Environment variable: if $IPYTHON_SSHX is defined, this overrides the
55 internal default.
56
57 c) Variable 'sshx' in the cluster configuration file: finally, this will
58 override the previous two values.
59
60 This code is Unix-only, with precious little hope of any of this ever working
61 under Windows, since we need SSH from the ground up, we background processes,
62 etc. Ports of this functionality to Windows are welcome.
63
64
65 Call summary
66 ------------
67
68 %prog [options]
69 """
70
71 __docformat__ = "restructuredtext en"
72
73 #-------------------------------------------------------------------------------
74 # Copyright (C) 2008 The IPython Development Team
75 #
76 # Distributed under the terms of the BSD License. The full license is in
77 # the file COPYING, distributed as part of this software.
78 #-------------------------------------------------------------------------------
79
80 #-------------------------------------------------------------------------------
81 # Stdlib imports
82 #-------------------------------------------------------------------------------
83
84 import os
1 import os
85 import signal
2 import re
86 import sys
3 import sys
87 import time
4 import signal
5 pjoin = os.path.join
88
6
89 from optparse import OptionParser
7 from twisted.internet import reactor, defer
90 from subprocess import Popen,call
8 from twisted.internet.protocol import ProcessProtocol
9 from twisted.python import failure, log
10 from twisted.internet.error import ProcessDone, ProcessTerminated
11 from twisted.internet.utils import getProcessOutput
91
12
92 #---------------------------------------------------------------------------
13 from IPython.external import argparse
93 # IPython imports
14 from IPython.external import Itpl
94 #---------------------------------------------------------------------------
15 from IPython.kernel.twistedutil import gatherBoth
95 from IPython.tools import utils
16 from IPython.kernel.util import printer
96 from IPython.genutils import get_ipython_dir
17 from IPython.genutils import get_ipython_dir
97
18
98 #---------------------------------------------------------------------------
99 # Normal code begins
100 #---------------------------------------------------------------------------
101
19
102 def parse_args():
20 # Test local cluster on Win32
103 """Parse command line and return opts,args."""
21 # Look at local cluster usage strings
22 # PBS stuff
104
23
105 parser = OptionParser(usage=__doc__)
24 class ProcessStateError(Exception):
106 newopt = parser.add_option # shorthand
25 pass
107
108 newopt("--controller-port", type="int", dest="controllerport",
109 help="the TCP port the controller is listening on")
110
111 newopt("--controller-ip", type="string", dest="controllerip",
112 help="the TCP ip address of the controller")
113
114 newopt("-n", "--num", type="int", dest="n",default=2,
115 help="the number of engines to start")
116
117 newopt("--engine-port", type="int", dest="engineport",
118 help="the TCP port the controller will listen on for engine "
119 "connections")
120
121 newopt("--engine-ip", type="string", dest="engineip",
122 help="the TCP ip address the controller will listen on "
123 "for engine connections")
124
125 newopt("--mpi", type="string", dest="mpi",
126 help="use mpi with package: for instance --mpi=mpi4py")
127
128 newopt("-l", "--logfile", type="string", dest="logfile",
129 help="log file name")
130
131 newopt('-f','--cluster-file',dest='clusterfile',
132 help='file describing a remote cluster')
133
134 return parser.parse_args()
135
136 def numAlive(controller,engines):
137 """Return the number of processes still alive."""
138 retcodes = [controller.poll()] + \
139 [e.poll() for e in engines]
140 return retcodes.count(None)
141
142 stop = lambda pid: os.kill(pid,signal.SIGINT)
143 kill = lambda pid: os.kill(pid,signal.SIGTERM)
144
145 def cleanup(clean,controller,engines):
146 """Stop the controller and engines with the given cleanup method."""
147
148 for e in engines:
149 if e.poll() is None:
150 print 'Stopping engine, pid',e.pid
151 clean(e.pid)
152 if controller.poll() is None:
153 print 'Stopping controller, pid',controller.pid
154 clean(controller.pid)
155
26
27 class UnknownStatus(Exception):
28 pass
156
29
157 def ensureDir(path):
30 class LauncherProcessProtocol(ProcessProtocol):
158 """Ensure a directory exists or raise an exception."""
31 """
159 if not os.path.isdir(path):
32 A ProcessProtocol to go with the ProcessLauncher.
160 os.makedirs(path)
33 """
34 def __init__(self, process_launcher):
35 self.process_launcher = process_launcher
36
37 def connectionMade(self):
38 self.process_launcher.fire_start_deferred(self.transport.pid)
39
40 def processEnded(self, status):
41 value = status.value
42 if isinstance(value, ProcessDone):
43 self.process_launcher.fire_stop_deferred(0)
44 elif isinstance(value, ProcessTerminated):
45 self.process_launcher.fire_stop_deferred(
46 {'exit_code':value.exitCode,
47 'signal':value.signal,
48 'status':value.status
49 }
50 )
51 else:
52 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
161
53
54 def outReceived(self, data):
55 log.msg(data)
162
56
163 def startMsg(control_host,control_port=10105):
57 def errReceived(self, data):
164 """Print a startup message"""
58 log.err(data)
165 print
166 print 'Your cluster is up and running.'
167 print
168 print 'For interactive use, you can make a MultiEngineClient with:'
169 print
170 print 'from IPython.kernel import client'
171 print "mec = client.MultiEngineClient()"
172 print
173 print 'You can then cleanly stop the cluster from IPython using:'
174 print
175 print 'mec.kill(controller=True)'
176 print
177
59
60 class ProcessLauncher(object):
61 """
62 Start and stop an external process in an asynchronous manner.
178
63
179 def clusterLocal(opt,arg):
64 Currently this uses deferreds to notify other parties of process state
180 """Start a cluster on the local machine."""
65 changes. This is an awkward design and should be moved to using
66 a formal NotificationCenter.
67 """
68 def __init__(self, cmd_and_args):
69
70 self.cmd = cmd_and_args[0]
71 self.args = cmd_and_args
72 self._reset()
73
74 def _reset(self):
75 self.process_protocol = None
76 self.pid = None
77 self.start_deferred = None
78 self.stop_deferreds = []
79 self.state = 'before' # before, running, or after
80
81 @property
82 def running(self):
83 if self.state == 'running':
84 return True
85 else:
86 return False
87
88 def fire_start_deferred(self, pid):
89 self.pid = pid
90 self.state = 'running'
91 log.msg('Process %r has started with pid=%i' % (self.args, pid))
92 self.start_deferred.callback(pid)
93
94 def start(self):
95 if self.state == 'before':
96 self.process_protocol = LauncherProcessProtocol(self)
97 self.start_deferred = defer.Deferred()
98 self.process_transport = reactor.spawnProcess(
99 self.process_protocol,
100 self.cmd,
101 self.args,
102 env=os.environ
103 )
104 return self.start_deferred
105 else:
106 s = 'the process has already been started and has state: %r' % \
107 self.state
108 return defer.fail(ProcessStateError(s))
109
110 def get_stop_deferred(self):
111 if self.state == 'running' or self.state == 'before':
112 d = defer.Deferred()
113 self.stop_deferreds.append(d)
114 return d
115 else:
116 s = 'this process is already complete'
117 return defer.fail(ProcessStateError(s))
181
118
182 # Store all logs inside the ipython directory
119 def fire_stop_deferred(self, exit_code):
183 ipdir = get_ipython_dir()
120 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
184 pjoin = os.path.join
121 self.state = 'after'
122 for d in self.stop_deferreds:
123 d.callback(exit_code)
185
124
186 logfile = opt.logfile
125 def signal(self, sig):
187 if logfile is None:
188 logdir_base = pjoin(ipdir,'log')
189 ensureDir(logdir_base)
190 logfile = pjoin(logdir_base,'ipcluster-')
191
192 print 'Starting controller:',
193 controller = Popen(['ipcontroller','--logfile',logfile,'-x','-y'])
194 print 'Controller PID:',controller.pid
195
196 print 'Starting engines: ',
197 time.sleep(5)
198
199 englogfile = '%s%s-' % (logfile,controller.pid)
200 mpi = opt.mpi
201 if mpi: # start with mpi - killing the engines with sigterm will not work if you do this
202 engines = [Popen(['mpirun', '-np', str(opt.n), 'ipengine', '--mpi',
203 mpi, '--logfile',englogfile])]
204 # engines = [Popen(['mpirun', '-np', str(opt.n), 'ipengine', '--mpi', mpi])]
205 else: # do what we would normally do
206 engines = [ Popen(['ipengine','--logfile',englogfile])
207 for i in range(opt.n) ]
208 eids = [e.pid for e in engines]
209 print 'Engines PIDs: ',eids
210 print 'Log files: %s*' % englogfile
211
212 proc_ids = eids + [controller.pid]
213 procs = engines + [controller]
214
215 grpid = os.getpgrp()
216 try:
217 startMsg('127.0.0.1')
218 print 'You can also hit Ctrl-C to stop it, or use from the cmd line:'
219 print
220 print 'kill -INT',grpid
221 print
222 try:
223 while True:
224 time.sleep(5)
225 except:
226 pass
227 finally:
228 print 'Stopping cluster. Cleaning up...'
229 cleanup(stop,controller,engines)
230 for i in range(4):
231 time.sleep(i+2)
232 nZombies = numAlive(controller,engines)
233 if nZombies== 0:
234 print 'OK: All processes cleaned up.'
235 break
236 print 'Trying again, %d processes did not stop...' % nZombies
237 cleanup(kill,controller,engines)
238 if numAlive(controller,engines) == 0:
239 print 'OK: All processes cleaned up.'
240 break
241 else:
242 print '*'*75
243 print 'ERROR: could not kill some processes, try to do it',
244 print 'manually.'
245 zombies = []
246 if controller.returncode is None:
247 print 'Controller is alive: pid =',controller.pid
248 zombies.append(controller.pid)
249 liveEngines = [ e for e in engines if e.returncode is None ]
250 for e in liveEngines:
251 print 'Engine is alive: pid =',e.pid
252 zombies.append(e.pid)
253 print
254 print 'Zombie summary:',' '.join(map(str,zombies))
255
256 def clusterRemote(opt,arg):
257 """Start a remote cluster over SSH"""
258
259 # B. Granger, 9/3/08
260 # The launching of a remote cluster using SSH and a clusterfile
261 # is broken. Because it won't be fixed before the 0.9 release,
262 # we are removing it. For now, we just print a message to the
263 # user and abort.
264
265 print """The launching of a remote IPython cluster using SSL
266 and a clusterfile has been removed in this release.
267 It has been broken for a while and we are in the process
268 of building a new process management system that will be
269 used to provide a more robust way of starting an IPython
270 cluster.
271
272 For now remote clusters have to be launched using ipcontroller
273 and ipengine separately.
274 """
126 """
275 sys.exit(1)
127 Send a signal to the process.
276
277 # Load the remote cluster configuration
278 clConfig = {}
279 execfile(opt.clusterfile,clConfig)
280 contConfig = clConfig['controller']
281 engConfig = clConfig['engines']
282 # Determine where to find sshx:
283 sshx = clConfig.get('sshx',os.environ.get('IPYTHON_SSHX','sshx'))
284
285 # Store all logs inside the ipython directory
286 ipdir = get_ipython_dir()
287 pjoin = os.path.join
288
128
289 logfile = opt.logfile
129 The argument sig can be ('KILL','INT', etc.) or any signal number.
290 if logfile is None:
130 """
291 logdir_base = pjoin(ipdir,'log')
131 if self.state == 'running':
292 ensureDir(logdir_base)
132 self.process_transport.signalProcess(sig)
293 logfile = pjoin(logdir_base,'ipcluster')
133
294
134 def __del__(self):
295 # Append this script's PID to the logfile name always
135 self.signal('KILL')
296 logfile = '%s-%s' % (logfile,os.getpid())
136
297
137 def interrupt_then_kill(self, delay=1.0):
298 print 'Starting controller:'
138 self.signal('INT')
299 # Controller data:
139 reactor.callLater(delay, self.signal, 'KILL')
300 xsys = os.system
140
301
141
302 contHost = contConfig['host']
142 class ControllerLauncher(ProcessLauncher):
303 contLog = '%s-con-%s-' % (logfile,contHost)
143
304 cmd = "ssh %s '%s' 'ipcontroller --logfile %s' &" % \
144 def __init__(self, extra_args=None):
305 (contHost,sshx,contLog)
145 self.args = ['ipcontroller']
306 #print 'cmd:<%s>' % cmd # dbg
146 self.extra_args = extra_args
307 xsys(cmd)
147 if extra_args is not None:
308 time.sleep(2)
148 self.args.extend(extra_args)
309
149
310 print 'Starting engines: '
150 ProcessLauncher.__init__(self, self.args)
311 for engineHost,engineData in engConfig.iteritems():
151
312 if isinstance(engineData,int):
152
313 numEngines = engineData
153 class EngineLauncher(ProcessLauncher):
154
155 def __init__(self, extra_args=None):
156 self.args = ['ipengine']
157 self.extra_args = extra_args
158 if extra_args is not None:
159 self.args.extend(extra_args)
160
161 ProcessLauncher.__init__(self, self.args)
162
163
164 class LocalEngineSet(object):
165
166 def __init__(self, extra_args=None):
167 self.extra_args = extra_args
168 self.launchers = []
169
170 def start(self, n):
171 dlist = []
172 for i in range(n):
173 el = EngineLauncher(extra_args=self.extra_args)
174 d = el.start()
175 self.launchers.append(el)
176 dlist.append(d)
177 dfinal = gatherBoth(dlist, consumeErrors=True)
178 dfinal.addCallback(self._handle_start)
179 return dfinal
180
181 def _handle_start(self, r):
182 log.msg('Engines started with pids: %r' % r)
183 return r
184
185 def _handle_stop(self, r):
186 log.msg('Engines received signal: %r' % r)
187 return r
188
189 def signal(self, sig):
190 dlist = []
191 for el in self.launchers:
192 d = el.get_stop_deferred()
193 dlist.append(d)
194 el.signal(sig)
195 dfinal = gatherBoth(dlist, consumeErrors=True)
196 dfinal.addCallback(self._handle_stop)
197 return dfinal
198
199 def interrupt_then_kill(self, delay=1.0):
200 dlist = []
201 for el in self.launchers:
202 d = el.get_stop_deferred()
203 dlist.append(d)
204 el.interrupt_then_kill(delay)
205 dfinal = gatherBoth(dlist, consumeErrors=True)
206 dfinal.addCallback(self._handle_stop)
207 return dfinal
208
209
210 class BatchEngineSet(object):
211
212 # Subclasses must fill these in. See PBSEngineSet
213 submit_command = ''
214 delete_command = ''
215 job_id_regexp = ''
216
217 def __init__(self, template_file, **kwargs):
218 self.template_file = template_file
219 self.context = {}
220 self.context.update(kwargs)
221 self.batch_file = 'batch-script'
222
223 def parse_job_id(self, output):
224 m = re.match(self.job_id_regexp, output)
225 if m is not None:
226 job_id = m.group()
314 else:
227 else:
315 raise NotImplementedError('port configuration not finished for engines')
228 raise Exception("job id couldn't be determined: %s" % output)
229 self.job_id = job_id
230 print 'Job started with job id:', job_id
231 return job_id
232
233 def write_batch_script(self, n):
234 print 'n', n
235 self.context['n'] = n
236 template = open(self.template_file, 'r').read()
237 print 'template', template
238 script_as_string = Itpl.itplns(template, self.context)
239 print 'script', script_as_string
240 f = open(self.batch_file,'w')
241 f.write(script_as_string)
242 f.close()
243
244 def handle_error(self, f):
245 f.printTraceback()
246 f.raiseException()
247
248 def start(self, n):
249 self.write_batch_script(n)
250 d = getProcessOutput(self.submit_command,
251 [self.batch_file],env=os.environ)
252 d.addCallback(self.parse_job_id)
253 #d.addErrback(self.handle_error)
254 return d
255
256 def kill(self):
257 d = getProcessOutput(self.delete_command,
258 [self.job_id],env=os.environ)
259 return d
260
261 class PBSEngineSet(BatchEngineSet):
262
263 submit_command = 'qsub'
264 delete_command = 'qdel'
265 job_id_regexp = '\d+'
266
267 def __init__(self, template_file, **kwargs):
268 BatchEngineSet.__init__(self, template_file, **kwargs)
269
270
271 def main_local(args):
272 cont_args = []
273 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
274 cl = ControllerLauncher(extra_args=cont_args)
275 dstart = cl.start()
276 def start_engines(cont_pid):
277 engine_args = []
278 engine_args.append('--logfile=%s' % \
279 pjoin(args.logdir,'ipengine%s-' % cont_pid))
280 eset = LocalEngineSet(extra_args=engine_args)
281 def shutdown(signum, frame):
282 log.msg('Stopping local cluster')
283 # We are still playing with the times here, but these seem
284 # to be reliable in allowing everything to exit cleanly.
285 eset.interrupt_then_kill(0.5)
286 cl.interrupt_then_kill(0.5)
287 reactor.callLater(1.0, reactor.stop)
288 signal.signal(signal.SIGINT,shutdown)
289 d = eset.start(args.n)
290 return d
291 def delay_start(cont_pid):
292 # This is needed because the controller doesn't start listening
293 # right when it starts and the controller needs to write
294 # furl files for the engine to pick up
295 reactor.callLater(1.0, start_engines, cont_pid)
296 dstart.addCallback(delay_start)
297 dstart.addErrback(lambda f: f.raiseException())
298
299 def main_mpirun(args):
300 cont_args = []
301 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
302 cl = ControllerLauncher(extra_args=cont_args)
303 dstart = cl.start()
304 def start_engines(cont_pid):
305 raw_args = ['mpirun']
306 raw_args.extend(['-n',str(args.n)])
307 raw_args.append('ipengine')
308 raw_args.append('-l')
309 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
310 raw_args.append('--mpi=%s' % args.mpi)
311 eset = ProcessLauncher(raw_args)
312 def shutdown(signum, frame):
313 log.msg('Stopping local cluster')
314 # We are still playing with the times here, but these seem
315 # to be reliable in allowing everything to exit cleanly.
316 eset.interrupt_then_kill(1.0)
317 cl.interrupt_then_kill(1.0)
318 reactor.callLater(2.0, reactor.stop)
319 signal.signal(signal.SIGINT,shutdown)
320 d = eset.start()
321 return d
322 def delay_start(cont_pid):
323 # This is needed because the controller doesn't start listening
324 # right when it starts and the controller needs to write
325 # furl files for the engine to pick up
326 reactor.callLater(1.0, start_engines, cont_pid)
327 dstart.addCallback(delay_start)
328 dstart.addErrback(lambda f: f.raiseException())
329
330 def main_pbs(args):
331 cl = ControllerLauncher()
332 dstart = cl.start()
333 def start_engines(r):
334 pbs_set = PBSEngineSet('pbs.template')
335 print pbs_set.template_file
336 d = pbs_set.start(args.n)
337 return d
338 dstart.addCallback(start_engines)
339 dstart.addErrback(lambda f: f.printTraceback())
340
341
342 def get_args():
343 parser = argparse.ArgumentParser(
344 description='IPython cluster startup')
345 newopt = parser.add_argument # shorthand
346
347 subparsers = parser.add_subparsers(help='sub-command help')
348
349 parser_local = subparsers.add_parser('local', help='run a local cluster')
350 parser_local.add_argument("--logdir", type=str, dest="logdir",
351 help="directory to put log files (default=$IPYTHONDIR/log)",
352 default=pjoin(get_ipython_dir(),'log'))
353 parser_local.add_argument("-n", "--num", type=int, dest="n",default=2,
354 help="the number of engines to start")
355 parser_local.set_defaults(func=main_local)
316
356
317 print 'Sarting %d engines on %s' % (numEngines,engineHost)
357 parser_local = subparsers.add_parser('mpirun', help='run a cluster using mpirun')
318 engLog = '%s-eng-%s-' % (logfile,engineHost)
358 parser_local.add_argument("--logdir", type=str, dest="logdir",
319 for i in range(numEngines):
359 help="directory to put log files (default=$IPYTHONDIR/log)",
320 cmd = "ssh %s '%s' 'ipengine --controller-ip %s --logfile %s' &" % \
360 default=pjoin(get_ipython_dir(),'log'))
321 (engineHost,sshx,contHost,engLog)
361 parser_local.add_argument("-n", "--num", type=int, dest="n",default=2,
322 #print 'cmd:<%s>' % cmd # dbg
362 help="the number of engines to start")
323 xsys(cmd)
363 parser_local.add_argument("--mpi", type=str, dest="mpi",default='mpi4py',
324 # Wait after each host a little bit
364 help="how to call MPI_Init (default=mpi4py)")
325 time.sleep(1)
365 parser_local.set_defaults(func=main_mpirun)
326
366
327 startMsg(contConfig['host'])
367 parser_pbs = subparsers.add_parser('pbs', help='run a pbs cluster')
368 parser_pbs.add_argument('--pbs-script', type=str, dest='pbsscript',
369 help='PBS script template')
370 parser_pbs.set_defaults(func=main_pbs)
371 args = parser.parse_args()
372 return args
328
373
329 def main():
374 def main():
330 """Main driver for the two big options: local or remote cluster."""
375 args = get_args()
331
376 reactor.callWhenRunning(args.func, args)
332 if sys.platform=='win32':
377 log.startLogging(sys.stdout)
333 print """ipcluster does not work on Microsoft Windows. Please start
378 reactor.run()
334 your IPython cluster using the ipcontroller and ipengine scripts."""
335 sys.exit(1)
336
337 opt,arg = parse_args()
338
339 clusterfile = opt.clusterfile
340 if clusterfile:
341 clusterRemote(opt,arg)
342 else:
343 clusterLocal(opt,arg)
344
345
379
346 if __name__=='__main__':
380 if __name__ == '__main__':
347 main()
381 main()
General Comments 0
You need to be logged in to leave comments. Login now