##// END OF EJS Templates
Initial version of working refactored ipcluster....
Brian Granger -
Show More
This diff has been collapsed as it changes many lines, (660 lines changed) Show them Hide them
@@ -1,347 +1,381 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 """Start an IPython cluster conveniently, either locally or remotely.
5
6 Basic usage
7 -----------
8
9 For local operation, the simplest mode of usage is:
10
11 %prog -n N
12
13 where N is the number of engines you want started.
14
15 For remote operation, you must call it with a cluster description file:
16
17 %prog -f clusterfile.py
18
19 The cluster file is a normal Python script which gets run via execfile(). You
20 can have arbitrary logic in it, but all that matters is that at the end of the
21 execution, it declares the variables 'controller', 'engines', and optionally
22 'sshx'. See the accompanying examples for details on what these variables must
23 contain.
24
25
26 Notes
27 -----
28
29 WARNING: this code is still UNFINISHED and EXPERIMENTAL! It is incomplete,
30 some listed options are not really implemented, and all of its interfaces are
31 subject to change.
32
33 When operating over SSH for a remote cluster, this program relies on the
34 existence of a particular script called 'sshx'. This script must live in the
35 target systems where you'll be running your controller and engines, and is
36 needed to configure your PATH and PYTHONPATH variables for further execution of
37 python code at the other end of an SSH connection. The script can be as simple
38 as:
39
40 #!/bin/sh
41 . $HOME/.bashrc
42 "$@"
43
44 which is the default one provided by IPython. You can modify this or provide
45 your own. Since it's quite likely that for different clusters you may need
46 this script to configure things differently or that it may live in different
47 locations, its full path can be set in the same file where you define the
48 cluster setup. IPython's order of evaluation for this variable is the
49 following:
50
51 a) Internal default: 'sshx'. This only works if it is in the default system
52 path which SSH sets up in non-interactive mode.
53
54 b) Environment variable: if $IPYTHON_SSHX is defined, this overrides the
55 internal default.
56
57 c) Variable 'sshx' in the cluster configuration file: finally, this will
58 override the previous two values.
59
60 This code is Unix-only, with precious little hope of any of this ever working
61 under Windows, since we need SSH from the ground up, we background processes,
62 etc. Ports of this functionality to Windows are welcome.
63
64
65 Call summary
66 ------------
67
68 %prog [options]
69 """
70
71 __docformat__ = "restructuredtext en"
72
73 #-------------------------------------------------------------------------------
74 # Copyright (C) 2008 The IPython Development Team
75 #
76 # Distributed under the terms of the BSD License. The full license is in
77 # the file COPYING, distributed as part of this software.
78 #-------------------------------------------------------------------------------
79
80 #-------------------------------------------------------------------------------
81 # Stdlib imports
82 #-------------------------------------------------------------------------------
83
84 import os
1 import os
85 import signal
2 import re
86 import sys
3 import sys
87 import time
4 import signal
88
5 pjoin = os.path.join
89 from optparse import OptionParser
6
90 from subprocess import Popen,call
7 from twisted.internet import reactor, defer
91
8 from twisted.internet.protocol import ProcessProtocol
92 #---------------------------------------------------------------------------
9 from twisted.python import failure, log
93 # IPython imports
10 from twisted.internet.error import ProcessDone, ProcessTerminated
94 #---------------------------------------------------------------------------
11 from twisted.internet.utils import getProcessOutput
95 from IPython.tools import utils
12
13 from IPython.external import argparse
14 from IPython.external import Itpl
15 from IPython.kernel.twistedutil import gatherBoth
16 from IPython.kernel.util import printer
96 from IPython.genutils import get_ipython_dir
17 from IPython.genutils import get_ipython_dir
97
18
98 #---------------------------------------------------------------------------
99 # Normal code begins
100 #---------------------------------------------------------------------------
101
102 def parse_args():
103 """Parse command line and return opts,args."""
104
105 parser = OptionParser(usage=__doc__)
106 newopt = parser.add_option # shorthand
107
19
108 newopt("--controller-port", type="int", dest="controllerport",
20 # Test local cluster on Win32
109 help="the TCP port the controller is listening on")
21 # Look at local cluster usage strings
22 # PBS stuff
110
23
111 newopt("--controller-ip", type="string", dest="controllerip",
24 class ProcessStateError(Exception):
112 help="the TCP ip address of the controller")
25 pass
113
26
114 newopt("-n", "--num", type="int", dest="n",default=2,
27 class UnknownStatus(Exception):
115 help="the number of engines to start")
28 pass
116
29
117 newopt("--engine-port", type="int", dest="engineport",
30 class LauncherProcessProtocol(ProcessProtocol):
118 help="the TCP port the controller will listen on for engine "
31 """
119 "connections")
32 A ProcessProtocol to go with the ProcessLauncher.
33 """
34 def __init__(self, process_launcher):
35 self.process_launcher = process_launcher
120
36
121 newopt("--engine-ip", type="string", dest="engineip",
37 def connectionMade(self):
122 help="the TCP ip address the controller will listen on "
38 self.process_launcher.fire_start_deferred(self.transport.pid)
123 "for engine connections")
124
125 newopt("--mpi", type="string", dest="mpi",
126 help="use mpi with package: for instance --mpi=mpi4py")
127
39
128 newopt("-l", "--logfile", type="string", dest="logfile",
40 def processEnded(self, status):
129 help="log file name")
41 value = status.value
130
42 if isinstance(value, ProcessDone):
131 newopt('-f','--cluster-file',dest='clusterfile',
43 self.process_launcher.fire_stop_deferred(0)
132 help='file describing a remote cluster')
44 elif isinstance(value, ProcessTerminated):
133
45 self.process_launcher.fire_stop_deferred(
134 return parser.parse_args()
46 {'exit_code':value.exitCode,
47 'signal':value.signal,
48 'status':value.status
49 }
50 )
51 else:
52 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
135
53
136 def numAlive(controller,engines):
54 def outReceived(self, data):
137 """Return the number of processes still alive."""
55 log.msg(data)
138 retcodes = [controller.poll()] + \
139 [e.poll() for e in engines]
140 return retcodes.count(None)
141
56
142 stop = lambda pid: os.kill(pid,signal.SIGINT)
57 def errReceived(self, data):
143 kill = lambda pid: os.kill(pid,signal.SIGTERM)
58 log.err(data)
144
59
145 def cleanup(clean,controller,engines):
60 class ProcessLauncher(object):
146 """Stop the controller and engines with the given cleanup method."""
61 """
62 Start and stop an external process in an asynchronous manner.
147
63
148 for e in engines:
64 Currently this uses deferreds to notify other parties of process state
149 if e.poll() is None:
65 changes. This is an awkward design and should be moved to using
150 print 'Stopping engine, pid',e.pid
66 a formal NotificationCenter.
151 clean(e.pid)
67 """
152 if controller.poll() is None:
68 def __init__(self, cmd_and_args):
153 print 'Stopping controller, pid',controller.pid
154 clean(controller.pid)
155
156
157 def ensureDir(path):
158 """Ensure a directory exists or raise an exception."""
159 if not os.path.isdir(path):
160 os.makedirs(path)
161
162
163 def startMsg(control_host,control_port=10105):
164 """Print a startup message"""
165 print
166 print 'Your cluster is up and running.'
167 print
168 print 'For interactive use, you can make a MultiEngineClient with:'
169 print
170 print 'from IPython.kernel import client'
171 print "mec = client.MultiEngineClient()"
172 print
173 print 'You can then cleanly stop the cluster from IPython using:'
174 print
175 print 'mec.kill(controller=True)'
176 print
177
69
70 self.cmd = cmd_and_args[0]
71 self.args = cmd_and_args
72 self._reset()
178
73
179 def clusterLocal(opt,arg):
74 def _reset(self):
180 """Start a cluster on the local machine."""
75 self.process_protocol = None
76 self.pid = None
77 self.start_deferred = None
78 self.stop_deferreds = []
79 self.state = 'before' # before, running, or after
181
80
182 # Store all logs inside the ipython directory
81 @property
183 ipdir = get_ipython_dir()
82 def running(self):
184 pjoin = os.path.join
83 if self.state == 'running':
185
84 return True
186 logfile = opt.logfile
85 else:
187 if logfile is None:
86 return False
188 logdir_base = pjoin(ipdir,'log')
189 ensureDir(logdir_base)
190 logfile = pjoin(logdir_base,'ipcluster-')
191
192 print 'Starting controller:',
193 controller = Popen(['ipcontroller','--logfile',logfile,'-x','-y'])
194 print 'Controller PID:',controller.pid
195
196 print 'Starting engines: ',
197 time.sleep(5)
198
199 englogfile = '%s%s-' % (logfile,controller.pid)
200 mpi = opt.mpi
201 if mpi: # start with mpi - killing the engines with sigterm will not work if you do this
202 engines = [Popen(['mpirun', '-np', str(opt.n), 'ipengine', '--mpi',
203 mpi, '--logfile',englogfile])]
204 # engines = [Popen(['mpirun', '-np', str(opt.n), 'ipengine', '--mpi', mpi])]
205 else: # do what we would normally do
206 engines = [ Popen(['ipengine','--logfile',englogfile])
207 for i in range(opt.n) ]
208 eids = [e.pid for e in engines]
209 print 'Engines PIDs: ',eids
210 print 'Log files: %s*' % englogfile
211
87
212 proc_ids = eids + [controller.pid]
88 def fire_start_deferred(self, pid):
213 procs = engines + [controller]
89 self.pid = pid
214
90 self.state = 'running'
215 grpid = os.getpgrp()
91 log.msg('Process %r has started with pid=%i' % (self.args, pid))
216 try:
92 self.start_deferred.callback(pid)
217 startMsg('127.0.0.1')
93
218 print 'You can also hit Ctrl-C to stop it, or use from the cmd line:'
94 def start(self):
219 print
95 if self.state == 'before':
220 print 'kill -INT',grpid
96 self.process_protocol = LauncherProcessProtocol(self)
221 print
97 self.start_deferred = defer.Deferred()
222 try:
98 self.process_transport = reactor.spawnProcess(
223 while True:
99 self.process_protocol,
224 time.sleep(5)
100 self.cmd,
225 except:
101 self.args,
226 pass
102 env=os.environ
227 finally:
103 )
228 print 'Stopping cluster. Cleaning up...'
104 return self.start_deferred
229 cleanup(stop,controller,engines)
230 for i in range(4):
231 time.sleep(i+2)
232 nZombies = numAlive(controller,engines)
233 if nZombies== 0:
234 print 'OK: All processes cleaned up.'
235 break
236 print 'Trying again, %d processes did not stop...' % nZombies
237 cleanup(kill,controller,engines)
238 if numAlive(controller,engines) == 0:
239 print 'OK: All processes cleaned up.'
240 break
241 else:
105 else:
242 print '*'*75
106 s = 'the process has already been started and has state: %r' % \
243 print 'ERROR: could not kill some processes, try to do it',
107 self.state
244 print 'manually.'
108 return defer.fail(ProcessStateError(s))
245 zombies = []
109
246 if controller.returncode is None:
110 def get_stop_deferred(self):
247 print 'Controller is alive: pid =',controller.pid
111 if self.state == 'running' or self.state == 'before':
248 zombies.append(controller.pid)
112 d = defer.Deferred()
249 liveEngines = [ e for e in engines if e.returncode is None ]
113 self.stop_deferreds.append(d)
250 for e in liveEngines:
114 return d
251 print 'Engine is alive: pid =',e.pid
115 else:
252 zombies.append(e.pid)
116 s = 'this process is already complete'
253 print
117 return defer.fail(ProcessStateError(s))
254 print 'Zombie summary:',' '.join(map(str,zombies))
118
255
119 def fire_stop_deferred(self, exit_code):
256 def clusterRemote(opt,arg):
120 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
257 """Start a remote cluster over SSH"""
121 self.state = 'after'
258
122 for d in self.stop_deferreds:
259 # B. Granger, 9/3/08
123 d.callback(exit_code)
260 # The launching of a remote cluster using SSH and a clusterfile
124
261 # is broken. Because it won't be fixed before the 0.9 release,
125 def signal(self, sig):
262 # we are removing it. For now, we just print a message to the
126 """
263 # user and abort.
127 Send a signal to the process.
128
129 The argument sig can be ('KILL','INT', etc.) or any signal number.
130 """
131 if self.state == 'running':
132 self.process_transport.signalProcess(sig)
133
134 def __del__(self):
135 self.signal('KILL')
264
136
265 print """The launching of a remote IPython cluster using SSL
137 def interrupt_then_kill(self, delay=1.0):
266 and a clusterfile has been removed in this release.
138 self.signal('INT')
267 It has been broken for a while and we are in the process
139 reactor.callLater(delay, self.signal, 'KILL')
268 of building a new process management system that will be
269 used to provide a more robust way of starting an IPython
270 cluster.
271
140
272 For now remote clusters have to be launched using ipcontroller
273 and ipengine separately.
274 """
275 sys.exit(1)
276
141
277 # Load the remote cluster configuration
142 class ControllerLauncher(ProcessLauncher):
278 clConfig = {}
279 execfile(opt.clusterfile,clConfig)
280 contConfig = clConfig['controller']
281 engConfig = clConfig['engines']
282 # Determine where to find sshx:
283 sshx = clConfig.get('sshx',os.environ.get('IPYTHON_SSHX','sshx'))
284
143
285 # Store all logs inside the ipython directory
144 def __init__(self, extra_args=None):
286 ipdir = get_ipython_dir()
145 self.args = ['ipcontroller']
287 pjoin = os.path.join
146 self.extra_args = extra_args
147 if extra_args is not None:
148 self.args.extend(extra_args)
149
150 ProcessLauncher.__init__(self, self.args)
288
151
289 logfile = opt.logfile
290 if logfile is None:
291 logdir_base = pjoin(ipdir,'log')
292 ensureDir(logdir_base)
293 logfile = pjoin(logdir_base,'ipcluster')
294
152
295 # Append this script's PID to the logfile name always
153 class EngineLauncher(ProcessLauncher):
296 logfile = '%s-%s' % (logfile,os.getpid())
297
154
298 print 'Starting controller:'
155 def __init__(self, extra_args=None):
299 # Controller data:
156 self.args = ['ipengine']
300 xsys = os.system
157 self.extra_args = extra_args
158 if extra_args is not None:
159 self.args.extend(extra_args)
160
161 ProcessLauncher.__init__(self, self.args)
301
162
302 contHost = contConfig['host']
303 contLog = '%s-con-%s-' % (logfile,contHost)
304 cmd = "ssh %s '%s' 'ipcontroller --logfile %s' &" % \
305 (contHost,sshx,contLog)
306 #print 'cmd:<%s>' % cmd # dbg
307 xsys(cmd)
308 time.sleep(2)
309
163
310 print 'Starting engines: '
164 class LocalEngineSet(object):
311 for engineHost,engineData in engConfig.iteritems():
165
312 if isinstance(engineData,int):
166 def __init__(self, extra_args=None):
313 numEngines = engineData
167 self.extra_args = extra_args
168 self.launchers = []
169
170 def start(self, n):
171 dlist = []
172 for i in range(n):
173 el = EngineLauncher(extra_args=self.extra_args)
174 d = el.start()
175 self.launchers.append(el)
176 dlist.append(d)
177 dfinal = gatherBoth(dlist, consumeErrors=True)
178 dfinal.addCallback(self._handle_start)
179 return dfinal
180
181 def _handle_start(self, r):
182 log.msg('Engines started with pids: %r' % r)
183 return r
184
185 def _handle_stop(self, r):
186 log.msg('Engines received signal: %r' % r)
187 return r
188
189 def signal(self, sig):
190 dlist = []
191 for el in self.launchers:
192 d = el.get_stop_deferred()
193 dlist.append(d)
194 el.signal(sig)
195 dfinal = gatherBoth(dlist, consumeErrors=True)
196 dfinal.addCallback(self._handle_stop)
197 return dfinal
198
199 def interrupt_then_kill(self, delay=1.0):
200 dlist = []
201 for el in self.launchers:
202 d = el.get_stop_deferred()
203 dlist.append(d)
204 el.interrupt_then_kill(delay)
205 dfinal = gatherBoth(dlist, consumeErrors=True)
206 dfinal.addCallback(self._handle_stop)
207 return dfinal
208
209
210 class BatchEngineSet(object):
211
212 # Subclasses must fill these in. See PBSEngineSet
213 submit_command = ''
214 delete_command = ''
215 job_id_regexp = ''
216
217 def __init__(self, template_file, **kwargs):
218 self.template_file = template_file
219 self.context = {}
220 self.context.update(kwargs)
221 self.batch_file = 'batch-script'
222
223 def parse_job_id(self, output):
224 m = re.match(self.job_id_regexp, output)
225 if m is not None:
226 job_id = m.group()
314 else:
227 else:
315 raise NotImplementedError('port configuration not finished for engines')
228 raise Exception("job id couldn't be determined: %s" % output)
316
229 self.job_id = job_id
317 print 'Sarting %d engines on %s' % (numEngines,engineHost)
230 print 'Job started with job id:', job_id
318 engLog = '%s-eng-%s-' % (logfile,engineHost)
231 return job_id
319 for i in range(numEngines):
232
320 cmd = "ssh %s '%s' 'ipengine --controller-ip %s --logfile %s' &" % \
233 def write_batch_script(self, n):
321 (engineHost,sshx,contHost,engLog)
234 print 'n', n
322 #print 'cmd:<%s>' % cmd # dbg
235 self.context['n'] = n
323 xsys(cmd)
236 template = open(self.template_file, 'r').read()
324 # Wait after each host a little bit
237 print 'template', template
325 time.sleep(1)
238 script_as_string = Itpl.itplns(template, self.context)
326
239 print 'script', script_as_string
327 startMsg(contConfig['host'])
240 f = open(self.batch_file,'w')
241 f.write(script_as_string)
242 f.close()
243
244 def handle_error(self, f):
245 f.printTraceback()
246 f.raiseException()
247
248 def start(self, n):
249 self.write_batch_script(n)
250 d = getProcessOutput(self.submit_command,
251 [self.batch_file],env=os.environ)
252 d.addCallback(self.parse_job_id)
253 #d.addErrback(self.handle_error)
254 return d
328
255
329 def main():
256 def kill(self):
330 """Main driver for the two big options: local or remote cluster."""
257 d = getProcessOutput(self.delete_command,
258 [self.job_id],env=os.environ)
259 return d
260
261 class PBSEngineSet(BatchEngineSet):
262
263 submit_command = 'qsub'
264 delete_command = 'qdel'
265 job_id_regexp = '\d+'
266
267 def __init__(self, template_file, **kwargs):
268 BatchEngineSet.__init__(self, template_file, **kwargs)
269
270
271 def main_local(args):
272 cont_args = []
273 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
274 cl = ControllerLauncher(extra_args=cont_args)
275 dstart = cl.start()
276 def start_engines(cont_pid):
277 engine_args = []
278 engine_args.append('--logfile=%s' % \
279 pjoin(args.logdir,'ipengine%s-' % cont_pid))
280 eset = LocalEngineSet(extra_args=engine_args)
281 def shutdown(signum, frame):
282 log.msg('Stopping local cluster')
283 # We are still playing with the times here, but these seem
284 # to be reliable in allowing everything to exit cleanly.
285 eset.interrupt_then_kill(0.5)
286 cl.interrupt_then_kill(0.5)
287 reactor.callLater(1.0, reactor.stop)
288 signal.signal(signal.SIGINT,shutdown)
289 d = eset.start(args.n)
290 return d
291 def delay_start(cont_pid):
292 # This is needed because the controller doesn't start listening
293 # right when it starts and the controller needs to write
294 # furl files for the engine to pick up
295 reactor.callLater(1.0, start_engines, cont_pid)
296 dstart.addCallback(delay_start)
297 dstart.addErrback(lambda f: f.raiseException())
298
299 def main_mpirun(args):
300 cont_args = []
301 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
302 cl = ControllerLauncher(extra_args=cont_args)
303 dstart = cl.start()
304 def start_engines(cont_pid):
305 raw_args = ['mpirun']
306 raw_args.extend(['-n',str(args.n)])
307 raw_args.append('ipengine')
308 raw_args.append('-l')
309 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
310 raw_args.append('--mpi=%s' % args.mpi)
311 eset = ProcessLauncher(raw_args)
312 def shutdown(signum, frame):
313 log.msg('Stopping local cluster')
314 # We are still playing with the times here, but these seem
315 # to be reliable in allowing everything to exit cleanly.
316 eset.interrupt_then_kill(1.0)
317 cl.interrupt_then_kill(1.0)
318 reactor.callLater(2.0, reactor.stop)
319 signal.signal(signal.SIGINT,shutdown)
320 d = eset.start()
321 return d
322 def delay_start(cont_pid):
323 # This is needed because the controller doesn't start listening
324 # right when it starts and the controller needs to write
325 # furl files for the engine to pick up
326 reactor.callLater(1.0, start_engines, cont_pid)
327 dstart.addCallback(delay_start)
328 dstart.addErrback(lambda f: f.raiseException())
329
330 def main_pbs(args):
331 cl = ControllerLauncher()
332 dstart = cl.start()
333 def start_engines(r):
334 pbs_set = PBSEngineSet('pbs.template')
335 print pbs_set.template_file
336 d = pbs_set.start(args.n)
337 return d
338 dstart.addCallback(start_engines)
339 dstart.addErrback(lambda f: f.printTraceback())
340
341
342 def get_args():
343 parser = argparse.ArgumentParser(
344 description='IPython cluster startup')
345 newopt = parser.add_argument # shorthand
331
346
332 if sys.platform=='win32':
347 subparsers = parser.add_subparsers(help='sub-command help')
333 print """ipcluster does not work on Microsoft Windows. Please start
334 your IPython cluster using the ipcontroller and ipengine scripts."""
335 sys.exit(1)
336
348
337 opt,arg = parse_args()
349 parser_local = subparsers.add_parser('local', help='run a local cluster')
350 parser_local.add_argument("--logdir", type=str, dest="logdir",
351 help="directory to put log files (default=$IPYTHONDIR/log)",
352 default=pjoin(get_ipython_dir(),'log'))
353 parser_local.add_argument("-n", "--num", type=int, dest="n",default=2,
354 help="the number of engines to start")
355 parser_local.set_defaults(func=main_local)
356
357 parser_local = subparsers.add_parser('mpirun', help='run a cluster using mpirun')
358 parser_local.add_argument("--logdir", type=str, dest="logdir",
359 help="directory to put log files (default=$IPYTHONDIR/log)",
360 default=pjoin(get_ipython_dir(),'log'))
361 parser_local.add_argument("-n", "--num", type=int, dest="n",default=2,
362 help="the number of engines to start")
363 parser_local.add_argument("--mpi", type=str, dest="mpi",default='mpi4py',
364 help="how to call MPI_Init (default=mpi4py)")
365 parser_local.set_defaults(func=main_mpirun)
366
367 parser_pbs = subparsers.add_parser('pbs', help='run a pbs cluster')
368 parser_pbs.add_argument('--pbs-script', type=str, dest='pbsscript',
369 help='PBS script template')
370 parser_pbs.set_defaults(func=main_pbs)
371 args = parser.parse_args()
372 return args
338
373
339 clusterfile = opt.clusterfile
374 def main():
340 if clusterfile:
375 args = get_args()
341 clusterRemote(opt,arg)
376 reactor.callWhenRunning(args.func, args)
342 else:
377 log.startLogging(sys.stdout)
343 clusterLocal(opt,arg)
378 reactor.run()
344
379
345
380 if __name__ == '__main__':
346 if __name__=='__main__':
347 main()
381 main()
General Comments 0
You need to be logged in to leave comments. Login now