##// END OF EJS Templates
Initial version that works with PBS.
Brian Granger -
Show More
@@ -1,435 +1,451 b''
1 import os
1 import os
2 import re
2 import re
3 import sys
3 import sys
4 import signal
4 import signal
5 pjoin = os.path.join
5 pjoin = os.path.join
6
6
7 from twisted.internet import reactor, defer
7 from twisted.internet import reactor, defer
8 from twisted.internet.protocol import ProcessProtocol
8 from twisted.internet.protocol import ProcessProtocol
9 from twisted.python import failure, log
9 from twisted.python import failure, log
10 from twisted.internet.error import ProcessDone, ProcessTerminated
10 from twisted.internet.error import ProcessDone, ProcessTerminated
11 from twisted.internet.utils import getProcessOutput
11 from twisted.internet.utils import getProcessOutput
12
12
13 from IPython.external import argparse
13 from IPython.external import argparse
14 from IPython.external import Itpl
14 from IPython.external import Itpl
15 from IPython.kernel.twistedutil import gatherBoth
15 from IPython.kernel.twistedutil import gatherBoth
16 from IPython.kernel.util import printer
16 from IPython.kernel.util import printer
17 from IPython.genutils import get_ipython_dir, num_cpus
17 from IPython.genutils import get_ipython_dir, num_cpus
18
18
19 def find_exe(cmd):
19 def find_exe(cmd):
20 try:
20 try:
21 import win32api
21 import win32api
22 except ImportError:
22 except ImportError:
23 raise ImportError('you need to have pywin32 installed for this to work')
23 raise ImportError('you need to have pywin32 installed for this to work')
24 else:
24 else:
25 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd)
25 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd)
26 return path
26 return path
27
27
28 # Test local cluster on Win32
28 # Test local cluster on Win32
29 # Look at local cluster usage strings
29 # Look at local cluster usage strings
30 # PBS stuff
30 # PBS stuff
31
31
32 class ProcessStateError(Exception):
32 class ProcessStateError(Exception):
33 pass
33 pass
34
34
35 class UnknownStatus(Exception):
35 class UnknownStatus(Exception):
36 pass
36 pass
37
37
38 class LauncherProcessProtocol(ProcessProtocol):
38 class LauncherProcessProtocol(ProcessProtocol):
39 """
39 """
40 A ProcessProtocol to go with the ProcessLauncher.
40 A ProcessProtocol to go with the ProcessLauncher.
41 """
41 """
42 def __init__(self, process_launcher):
42 def __init__(self, process_launcher):
43 self.process_launcher = process_launcher
43 self.process_launcher = process_launcher
44
44
45 def connectionMade(self):
45 def connectionMade(self):
46 self.process_launcher.fire_start_deferred(self.transport.pid)
46 self.process_launcher.fire_start_deferred(self.transport.pid)
47
47
48 def processEnded(self, status):
48 def processEnded(self, status):
49 value = status.value
49 value = status.value
50 if isinstance(value, ProcessDone):
50 if isinstance(value, ProcessDone):
51 self.process_launcher.fire_stop_deferred(0)
51 self.process_launcher.fire_stop_deferred(0)
52 elif isinstance(value, ProcessTerminated):
52 elif isinstance(value, ProcessTerminated):
53 self.process_launcher.fire_stop_deferred(
53 self.process_launcher.fire_stop_deferred(
54 {'exit_code':value.exitCode,
54 {'exit_code':value.exitCode,
55 'signal':value.signal,
55 'signal':value.signal,
56 'status':value.status
56 'status':value.status
57 }
57 }
58 )
58 )
59 else:
59 else:
60 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
60 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
61
61
62 def outReceived(self, data):
62 def outReceived(self, data):
63 log.msg(data)
63 log.msg(data)
64
64
65 def errReceived(self, data):
65 def errReceived(self, data):
66 log.err(data)
66 log.err(data)
67
67
68 class ProcessLauncher(object):
68 class ProcessLauncher(object):
69 """
69 """
70 Start and stop an external process in an asynchronous manner.
70 Start and stop an external process in an asynchronous manner.
71
71
72 Currently this uses deferreds to notify other parties of process state
72 Currently this uses deferreds to notify other parties of process state
73 changes. This is an awkward design and should be moved to using
73 changes. This is an awkward design and should be moved to using
74 a formal NotificationCenter.
74 a formal NotificationCenter.
75 """
75 """
76 def __init__(self, cmd_and_args):
76 def __init__(self, cmd_and_args):
77 self.cmd = cmd_and_args[0]
77 self.cmd = cmd_and_args[0]
78 self.args = cmd_and_args
78 self.args = cmd_and_args
79 self._reset()
79 self._reset()
80
80
81 def _reset(self):
81 def _reset(self):
82 self.process_protocol = None
82 self.process_protocol = None
83 self.pid = None
83 self.pid = None
84 self.start_deferred = None
84 self.start_deferred = None
85 self.stop_deferreds = []
85 self.stop_deferreds = []
86 self.state = 'before' # before, running, or after
86 self.state = 'before' # before, running, or after
87
87
88 @property
88 @property
89 def running(self):
89 def running(self):
90 if self.state == 'running':
90 if self.state == 'running':
91 return True
91 return True
92 else:
92 else:
93 return False
93 return False
94
94
95 def fire_start_deferred(self, pid):
95 def fire_start_deferred(self, pid):
96 self.pid = pid
96 self.pid = pid
97 self.state = 'running'
97 self.state = 'running'
98 log.msg('Process %r has started with pid=%i' % (self.args, pid))
98 log.msg('Process %r has started with pid=%i' % (self.args, pid))
99 self.start_deferred.callback(pid)
99 self.start_deferred.callback(pid)
100
100
101 def start(self):
101 def start(self):
102 if self.state == 'before':
102 if self.state == 'before':
103 self.process_protocol = LauncherProcessProtocol(self)
103 self.process_protocol = LauncherProcessProtocol(self)
104 self.start_deferred = defer.Deferred()
104 self.start_deferred = defer.Deferred()
105 self.process_transport = reactor.spawnProcess(
105 self.process_transport = reactor.spawnProcess(
106 self.process_protocol,
106 self.process_protocol,
107 self.cmd,
107 self.cmd,
108 self.args,
108 self.args,
109 env=os.environ
109 env=os.environ
110 )
110 )
111 return self.start_deferred
111 return self.start_deferred
112 else:
112 else:
113 s = 'the process has already been started and has state: %r' % \
113 s = 'the process has already been started and has state: %r' % \
114 self.state
114 self.state
115 return defer.fail(ProcessStateError(s))
115 return defer.fail(ProcessStateError(s))
116
116
117 def get_stop_deferred(self):
117 def get_stop_deferred(self):
118 if self.state == 'running' or self.state == 'before':
118 if self.state == 'running' or self.state == 'before':
119 d = defer.Deferred()
119 d = defer.Deferred()
120 self.stop_deferreds.append(d)
120 self.stop_deferreds.append(d)
121 return d
121 return d
122 else:
122 else:
123 s = 'this process is already complete'
123 s = 'this process is already complete'
124 return defer.fail(ProcessStateError(s))
124 return defer.fail(ProcessStateError(s))
125
125
126 def fire_stop_deferred(self, exit_code):
126 def fire_stop_deferred(self, exit_code):
127 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
127 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
128 self.state = 'after'
128 self.state = 'after'
129 for d in self.stop_deferreds:
129 for d in self.stop_deferreds:
130 d.callback(exit_code)
130 d.callback(exit_code)
131
131
132 def signal(self, sig):
132 def signal(self, sig):
133 """
133 """
134 Send a signal to the process.
134 Send a signal to the process.
135
135
136 The argument sig can be ('KILL','INT', etc.) or any signal number.
136 The argument sig can be ('KILL','INT', etc.) or any signal number.
137 """
137 """
138 if self.state == 'running':
138 if self.state == 'running':
139 self.process_transport.signalProcess(sig)
139 self.process_transport.signalProcess(sig)
140
140
141 # def __del__(self):
141 # def __del__(self):
142 # self.signal('KILL')
142 # self.signal('KILL')
143
143
144 def interrupt_then_kill(self, delay=1.0):
144 def interrupt_then_kill(self, delay=1.0):
145 self.signal('INT')
145 self.signal('INT')
146 reactor.callLater(delay, self.signal, 'KILL')
146 reactor.callLater(delay, self.signal, 'KILL')
147
147
148
148
149 class ControllerLauncher(ProcessLauncher):
149 class ControllerLauncher(ProcessLauncher):
150
150
151 def __init__(self, extra_args=None):
151 def __init__(self, extra_args=None):
152 if sys.platform == 'win32':
152 if sys.platform == 'win32':
153 args = [find_exe('ipcontroller.bat')]
153 args = [find_exe('ipcontroller.bat')]
154 else:
154 else:
155 args = ['ipcontroller']
155 args = ['ipcontroller']
156 self.extra_args = extra_args
156 self.extra_args = extra_args
157 if extra_args is not None:
157 if extra_args is not None:
158 args.extend(extra_args)
158 args.extend(extra_args)
159
159
160 ProcessLauncher.__init__(self, args)
160 ProcessLauncher.__init__(self, args)
161
161
162
162
163 class EngineLauncher(ProcessLauncher):
163 class EngineLauncher(ProcessLauncher):
164
164
165 def __init__(self, extra_args=None):
165 def __init__(self, extra_args=None):
166 if sys.platform == 'win32':
166 if sys.platform == 'win32':
167 args = [find_exe('ipengine.bat')]
167 args = [find_exe('ipengine.bat')]
168 else:
168 else:
169 args = ['ipengine']
169 args = ['ipengine']
170 self.extra_args = extra_args
170 self.extra_args = extra_args
171 if extra_args is not None:
171 if extra_args is not None:
172 args.extend(extra_args)
172 args.extend(extra_args)
173
173
174 ProcessLauncher.__init__(self, args)
174 ProcessLauncher.__init__(self, args)
175
175
176
176
177 class LocalEngineSet(object):
177 class LocalEngineSet(object):
178
178
179 def __init__(self, extra_args=None):
179 def __init__(self, extra_args=None):
180 self.extra_args = extra_args
180 self.extra_args = extra_args
181 self.launchers = []
181 self.launchers = []
182
182
183 def start(self, n):
183 def start(self, n):
184 dlist = []
184 dlist = []
185 for i in range(n):
185 for i in range(n):
186 el = EngineLauncher(extra_args=self.extra_args)
186 el = EngineLauncher(extra_args=self.extra_args)
187 d = el.start()
187 d = el.start()
188 self.launchers.append(el)
188 self.launchers.append(el)
189 dlist.append(d)
189 dlist.append(d)
190 dfinal = gatherBoth(dlist, consumeErrors=True)
190 dfinal = gatherBoth(dlist, consumeErrors=True)
191 dfinal.addCallback(self._handle_start)
191 dfinal.addCallback(self._handle_start)
192 return dfinal
192 return dfinal
193
193
194 def _handle_start(self, r):
194 def _handle_start(self, r):
195 log.msg('Engines started with pids: %r' % r)
195 log.msg('Engines started with pids: %r' % r)
196 return r
196 return r
197
197
198 def _handle_stop(self, r):
198 def _handle_stop(self, r):
199 log.msg('Engines received signal: %r' % r)
199 log.msg('Engines received signal: %r' % r)
200 return r
200 return r
201
201
202 def signal(self, sig):
202 def signal(self, sig):
203 dlist = []
203 dlist = []
204 for el in self.launchers:
204 for el in self.launchers:
205 d = el.get_stop_deferred()
205 d = el.get_stop_deferred()
206 dlist.append(d)
206 dlist.append(d)
207 el.signal(sig)
207 el.signal(sig)
208 dfinal = gatherBoth(dlist, consumeErrors=True)
208 dfinal = gatherBoth(dlist, consumeErrors=True)
209 dfinal.addCallback(self._handle_stop)
209 dfinal.addCallback(self._handle_stop)
210 return dfinal
210 return dfinal
211
211
212 def interrupt_then_kill(self, delay=1.0):
212 def interrupt_then_kill(self, delay=1.0):
213 dlist = []
213 dlist = []
214 for el in self.launchers:
214 for el in self.launchers:
215 d = el.get_stop_deferred()
215 d = el.get_stop_deferred()
216 dlist.append(d)
216 dlist.append(d)
217 el.interrupt_then_kill(delay)
217 el.interrupt_then_kill(delay)
218 dfinal = gatherBoth(dlist, consumeErrors=True)
218 dfinal = gatherBoth(dlist, consumeErrors=True)
219 dfinal.addCallback(self._handle_stop)
219 dfinal.addCallback(self._handle_stop)
220 return dfinal
220 return dfinal
221
221
222
222
223 class BatchEngineSet(object):
223 class BatchEngineSet(object):
224
224
225 # Subclasses must fill these in. See PBSEngineSet
225 # Subclasses must fill these in. See PBSEngineSet
226 submit_command = ''
226 submit_command = ''
227 delete_command = ''
227 delete_command = ''
228 job_id_regexp = ''
228 job_id_regexp = ''
229
229
230 def __init__(self, template_file, **kwargs):
230 def __init__(self, template_file, **kwargs):
231 self.template_file = template_file
231 self.template_file = template_file
232 self.context = {}
232 self.context = {}
233 self.context.update(kwargs)
233 self.context.update(kwargs)
234 self.batch_file = 'batch-script'
234 self.batch_file = 'batch-script'
235
235
236 def parse_job_id(self, output):
236 def parse_job_id(self, output):
237 m = re.match(self.job_id_regexp, output)
237 m = re.match(self.job_id_regexp, output)
238 if m is not None:
238 if m is not None:
239 job_id = m.group()
239 job_id = m.group()
240 else:
240 else:
241 raise Exception("job id couldn't be determined: %s" % output)
241 raise Exception("job id couldn't be determined: %s" % output)
242 self.job_id = job_id
242 self.job_id = job_id
243 print 'Job started with job id:', job_id
243 print 'Job started with job id:', job_id
244 return job_id
244 return job_id
245
245
246 def write_batch_script(self, n):
246 def write_batch_script(self, n):
247 print 'n', n
247 print 'n', n
248 self.context['n'] = n
248 self.context['n'] = n
249 template = open(self.template_file, 'r').read()
249 template = open(self.template_file, 'r').read()
250 print 'template', template
250 print 'template', template
251 log.msg(template)
252 log.msg(repr(self.context))
251 script_as_string = Itpl.itplns(template, self.context)
253 script_as_string = Itpl.itplns(template, self.context)
252 print 'script', script_as_string
254 log.msg(script_as_string)
253 f = open(self.batch_file,'w')
255 f = open(self.batch_file,'w')
254 f.write(script_as_string)
256 f.write(script_as_string)
255 f.close()
257 f.close()
256
258
257 def handle_error(self, f):
259 def handle_error(self, f):
258 f.printTraceback()
260 f.printTraceback()
259 f.raiseException()
261 #f.raiseException()
260
262
261 def start(self, n):
263 def start(self, n):
262 self.write_batch_script(n)
264 self.write_batch_script(n)
263 d = getProcessOutput(self.submit_command,
265 d = getProcessOutput(self.submit_command,
264 [self.batch_file],env=os.environ)
266 [self.batch_file],env=os.environ)
265 d.addCallback(self.parse_job_id)
267 d.addCallback(self.parse_job_id)
266 #d.addErrback(self.handle_error)
268 d.addErrback(self.handle_error)
267 return d
269 return d
268
270
269 def kill(self):
271 def kill(self):
270 d = getProcessOutput(self.delete_command,
272 d = getProcessOutput(self.delete_command,
271 [self.job_id],env=os.environ)
273 [self.job_id],env=os.environ)
272 return d
274 return d
273
275
274 class PBSEngineSet(BatchEngineSet):
276 class PBSEngineSet(BatchEngineSet):
275
277
276 submit_command = 'qsub'
278 submit_command = 'qsub'
277 delete_command = 'qdel'
279 delete_command = 'qdel'
278 job_id_regexp = '\d+'
280 job_id_regexp = '\d+'
279
281
280 def __init__(self, template_file, **kwargs):
282 def __init__(self, template_file, **kwargs):
281 BatchEngineSet.__init__(self, template_file, **kwargs)
283 BatchEngineSet.__init__(self, template_file, **kwargs)
282
284
283
285
284 def main_local(args):
286 def main_local(args):
285 cont_args = []
287 cont_args = []
286 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
288 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
287 if args.x:
289 if args.x:
288 cont_args.append('-x')
290 cont_args.append('-x')
289 if args.y:
291 if args.y:
290 cont_args.append('-y')
292 cont_args.append('-y')
291 cl = ControllerLauncher(extra_args=cont_args)
293 cl = ControllerLauncher(extra_args=cont_args)
292 dstart = cl.start()
294 dstart = cl.start()
293 def start_engines(cont_pid):
295 def start_engines(cont_pid):
294 engine_args = []
296 engine_args = []
295 engine_args.append('--logfile=%s' % \
297 engine_args.append('--logfile=%s' % \
296 pjoin(args.logdir,'ipengine%s-' % cont_pid))
298 pjoin(args.logdir,'ipengine%s-' % cont_pid))
297 eset = LocalEngineSet(extra_args=engine_args)
299 eset = LocalEngineSet(extra_args=engine_args)
298 def shutdown(signum, frame):
300 def shutdown(signum, frame):
299 log.msg('Stopping local cluster')
301 log.msg('Stopping local cluster')
300 # We are still playing with the times here, but these seem
302 # We are still playing with the times here, but these seem
301 # to be reliable in allowing everything to exit cleanly.
303 # to be reliable in allowing everything to exit cleanly.
302 eset.interrupt_then_kill(0.5)
304 eset.interrupt_then_kill(0.5)
303 cl.interrupt_then_kill(0.5)
305 cl.interrupt_then_kill(0.5)
304 reactor.callLater(1.0, reactor.stop)
306 reactor.callLater(1.0, reactor.stop)
305 signal.signal(signal.SIGINT,shutdown)
307 signal.signal(signal.SIGINT,shutdown)
306 d = eset.start(args.n)
308 d = eset.start(args.n)
307 return d
309 return d
308 def delay_start(cont_pid):
310 def delay_start(cont_pid):
309 # This is needed because the controller doesn't start listening
311 # This is needed because the controller doesn't start listening
310 # right when it starts and the controller needs to write
312 # right when it starts and the controller needs to write
311 # furl files for the engine to pick up
313 # furl files for the engine to pick up
312 reactor.callLater(1.0, start_engines, cont_pid)
314 reactor.callLater(1.0, start_engines, cont_pid)
313 dstart.addCallback(delay_start)
315 dstart.addCallback(delay_start)
314 dstart.addErrback(lambda f: f.raiseException())
316 dstart.addErrback(lambda f: f.raiseException())
315
317
316 def main_mpirun(args):
318 def main_mpirun(args):
317 cont_args = []
319 cont_args = []
318 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
320 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
319 if args.x:
321 if args.x:
320 cont_args.append('-x')
322 cont_args.append('-x')
321 if args.y:
323 if args.y:
322 cont_args.append('-y')
324 cont_args.append('-y')
323 cl = ControllerLauncher(extra_args=cont_args)
325 cl = ControllerLauncher(extra_args=cont_args)
324 dstart = cl.start()
326 dstart = cl.start()
325 def start_engines(cont_pid):
327 def start_engines(cont_pid):
326 raw_args = ['mpirun']
328 raw_args = ['mpirun']
327 raw_args.extend(['-n',str(args.n)])
329 raw_args.extend(['-n',str(args.n)])
328 raw_args.append('ipengine')
330 raw_args.append('ipengine')
329 raw_args.append('-l')
331 raw_args.append('-l')
330 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
332 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
331 raw_args.append('--mpi=%s' % args.mpi)
333 raw_args.append('--mpi=%s' % args.mpi)
332 eset = ProcessLauncher(raw_args)
334 eset = ProcessLauncher(raw_args)
333 def shutdown(signum, frame):
335 def shutdown(signum, frame):
334 log.msg('Stopping local cluster')
336 log.msg('Stopping local cluster')
335 # We are still playing with the times here, but these seem
337 # We are still playing with the times here, but these seem
336 # to be reliable in allowing everything to exit cleanly.
338 # to be reliable in allowing everything to exit cleanly.
337 eset.interrupt_then_kill(1.0)
339 eset.interrupt_then_kill(1.0)
338 cl.interrupt_then_kill(1.0)
340 cl.interrupt_then_kill(1.0)
339 reactor.callLater(2.0, reactor.stop)
341 reactor.callLater(2.0, reactor.stop)
340 signal.signal(signal.SIGINT,shutdown)
342 signal.signal(signal.SIGINT,shutdown)
341 d = eset.start()
343 d = eset.start()
342 return d
344 return d
343 def delay_start(cont_pid):
345 def delay_start(cont_pid):
344 # This is needed because the controller doesn't start listening
346 # This is needed because the controller doesn't start listening
345 # right when it starts and the controller needs to write
347 # right when it starts and the controller needs to write
346 # furl files for the engine to pick up
348 # furl files for the engine to pick up
347 reactor.callLater(1.0, start_engines, cont_pid)
349 reactor.callLater(1.0, start_engines, cont_pid)
348 dstart.addCallback(delay_start)
350 dstart.addCallback(delay_start)
349 dstart.addErrback(lambda f: f.raiseException())
351 dstart.addErrback(lambda f: f.raiseException())
350
352
351 def main_pbs(args):
353 def main_pbs(args):
352 cl = ControllerLauncher()
354 cont_args = []
355 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
356 if args.x:
357 cont_args.append('-x')
358 if args.y:
359 cont_args.append('-y')
360 cl = ControllerLauncher(extra_args=cont_args)
353 dstart = cl.start()
361 dstart = cl.start()
354 def start_engines(r):
362 def start_engines(r):
355 pbs_set = PBSEngineSet('pbs.template')
363 pbs_set = PBSEngineSet(args.pbsscript)
356 print pbs_set.template_file
357 d = pbs_set.start(args.n)
364 d = pbs_set.start(args.n)
358 return d
365 return d
359 dstart.addCallback(start_engines)
366 dstart.addCallback(start_engines)
360 dstart.addErrback(lambda f: f.printTraceback())
367 dstart.addErrback(lambda f: f.raiseException())
361
368
362
369
363 def get_args():
370 def get_args():
364 base_parser = argparse.ArgumentParser(add_help=False)
371 base_parser = argparse.ArgumentParser(add_help=False)
365 base_parser.add_argument(
372 base_parser.add_argument(
366 '-x',
373 '-x',
367 action='store_true',
374 action='store_true',
368 dest='x',
375 dest='x',
369 help='turn off client security'
376 help='turn off client security'
370 )
377 )
371 base_parser.add_argument(
378 base_parser.add_argument(
372 '-y',
379 '-y',
373 action='store_true',
380 action='store_true',
374 dest='y',
381 dest='y',
375 help='turn off engine security'
382 help='turn off engine security'
376 )
383 )
377 base_parser.add_argument(
384 base_parser.add_argument(
378 "--logdir",
385 "--logdir",
379 type=str,
386 type=str,
380 dest="logdir",
387 dest="logdir",
381 help="directory to put log files (default=$IPYTHONDIR/log)",
388 help="directory to put log files (default=$IPYTHONDIR/log)",
382 default=pjoin(get_ipython_dir(),'log')
389 default=pjoin(get_ipython_dir(),'log')
383 )
390 )
384 base_parser.add_argument(
391 base_parser.add_argument(
385 "-n",
392 "-n",
386 "--num",
393 "--num",
387 type=int,
394 type=int,
388 dest="n",
395 dest="n",
389 default=2,
396 default=2,
390 help="the number of engines to start"
397 help="the number of engines to start"
391 )
398 )
392
399
393 parser = argparse.ArgumentParser(
400 parser = argparse.ArgumentParser(
394 description='IPython cluster startup. This starts a controller and\
401 description='IPython cluster startup. This starts a controller and\
395 engines using various approaches'
402 engines using various approaches'
396 )
403 )
397 subparsers = parser.add_subparsers(
404 subparsers = parser.add_subparsers(
398 help='available cluster types. For help, do "ipcluster TYPE --help"')
405 help='available cluster types. For help, do "ipcluster TYPE --help"')
399
406
400 parser_local = subparsers.add_parser(
407 parser_local = subparsers.add_parser(
401 'local',
408 'local',
402 help='run a local cluster',
409 help='run a local cluster',
403 parents=[base_parser]
410 parents=[base_parser]
404 )
411 )
405 parser_local.set_defaults(func=main_local)
412 parser_local.set_defaults(func=main_local)
406
413
407 parser_mpirun = subparsers.add_parser(
414 parser_mpirun = subparsers.add_parser(
408 'mpirun',
415 'mpirun',
409 help='run a cluster using mpirun',
416 help='run a cluster using mpirun',
410 parents=[base_parser]
417 parents=[base_parser]
411 )
418 )
412 parser_mpirun.add_argument(
419 parser_mpirun.add_argument(
413 "--mpi",
420 "--mpi",
414 type=str,
421 type=str,
415 dest="mpi",
422 dest="mpi",
416 default='mpi4py',
423 default='mpi4py',
417 help="how to call MPI_Init (default=mpi4py)"
424 help="how to call MPI_Init (default=mpi4py)"
418 )
425 )
419 parser_mpirun.set_defaults(func=main_mpirun)
426 parser_mpirun.set_defaults(func=main_mpirun)
420
427
421 parser_pbs = subparsers.add_parser('pbs', help='run a pbs cluster')
428 parser_pbs = subparsers.add_parser(
422 parser_pbs.add_argument('--pbs-script', type=str, dest='pbsscript',
429 'pbs',
423 help='PBS script template')
430 help='run a pbs cluster',
431 parents=[base_parser]
432 )
433 parser_pbs.add_argument(
434 '--pbs-script',
435 type=str,
436 dest='pbsscript',
437 help='PBS script template',
438 default='pbs.template'
439 )
424 parser_pbs.set_defaults(func=main_pbs)
440 parser_pbs.set_defaults(func=main_pbs)
425 args = parser.parse_args()
441 args = parser.parse_args()
426 return args
442 return args
427
443
428 def main():
444 def main():
429 args = get_args()
445 args = get_args()
430 reactor.callWhenRunning(args.func, args)
446 reactor.callWhenRunning(args.func, args)
431 log.startLogging(sys.stdout)
447 log.startLogging(sys.stdout)
432 reactor.run()
448 reactor.run()
433
449
434 if __name__ == '__main__':
450 if __name__ == '__main__':
435 main()
451 main()
General Comments 0
You need to be logged in to leave comments. Login now