##// END OF EJS Templates
More changes to the PBS batch cluster.
Brian Granger -
Show More
@@ -1,451 +1,455 b''
1 import os
1 import os
2 import re
2 import re
3 import sys
3 import sys
4 import signal
4 import signal
5 pjoin = os.path.join
5 pjoin = os.path.join
6
6
7 from twisted.internet import reactor, defer
7 from twisted.internet import reactor, defer
8 from twisted.internet.protocol import ProcessProtocol
8 from twisted.internet.protocol import ProcessProtocol
9 from twisted.python import failure, log
9 from twisted.python import failure, log
10 from twisted.internet.error import ProcessDone, ProcessTerminated
10 from twisted.internet.error import ProcessDone, ProcessTerminated
11 from twisted.internet.utils import getProcessOutput
11 from twisted.internet.utils import getProcessOutput
12
12
13 from IPython.external import argparse
13 from IPython.external import argparse
14 from IPython.external import Itpl
14 from IPython.external import Itpl
15 from IPython.kernel.twistedutil import gatherBoth
15 from IPython.kernel.twistedutil import gatherBoth
16 from IPython.kernel.util import printer
16 from IPython.kernel.util import printer
17 from IPython.genutils import get_ipython_dir, num_cpus
17 from IPython.genutils import get_ipython_dir, num_cpus
18
18
19 def find_exe(cmd):
19 def find_exe(cmd):
20 try:
20 try:
21 import win32api
21 import win32api
22 except ImportError:
22 except ImportError:
23 raise ImportError('you need to have pywin32 installed for this to work')
23 raise ImportError('you need to have pywin32 installed for this to work')
24 else:
24 else:
25 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd)
25 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd)
26 return path
26 return path
27
27
28 # Test local cluster on Win32
28 # Test local cluster on Win32
29 # Look at local cluster usage strings
29 # Look at local cluster usage strings
30 # PBS stuff
30 # PBS stuff
31
31
32 class ProcessStateError(Exception):
32 class ProcessStateError(Exception):
33 pass
33 pass
34
34
35 class UnknownStatus(Exception):
35 class UnknownStatus(Exception):
36 pass
36 pass
37
37
38 class LauncherProcessProtocol(ProcessProtocol):
38 class LauncherProcessProtocol(ProcessProtocol):
39 """
39 """
40 A ProcessProtocol to go with the ProcessLauncher.
40 A ProcessProtocol to go with the ProcessLauncher.
41 """
41 """
42 def __init__(self, process_launcher):
42 def __init__(self, process_launcher):
43 self.process_launcher = process_launcher
43 self.process_launcher = process_launcher
44
44
45 def connectionMade(self):
45 def connectionMade(self):
46 self.process_launcher.fire_start_deferred(self.transport.pid)
46 self.process_launcher.fire_start_deferred(self.transport.pid)
47
47
48 def processEnded(self, status):
48 def processEnded(self, status):
49 value = status.value
49 value = status.value
50 if isinstance(value, ProcessDone):
50 if isinstance(value, ProcessDone):
51 self.process_launcher.fire_stop_deferred(0)
51 self.process_launcher.fire_stop_deferred(0)
52 elif isinstance(value, ProcessTerminated):
52 elif isinstance(value, ProcessTerminated):
53 self.process_launcher.fire_stop_deferred(
53 self.process_launcher.fire_stop_deferred(
54 {'exit_code':value.exitCode,
54 {'exit_code':value.exitCode,
55 'signal':value.signal,
55 'signal':value.signal,
56 'status':value.status
56 'status':value.status
57 }
57 }
58 )
58 )
59 else:
59 else:
60 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
60 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
61
61
62 def outReceived(self, data):
62 def outReceived(self, data):
63 log.msg(data)
63 log.msg(data)
64
64
65 def errReceived(self, data):
65 def errReceived(self, data):
66 log.err(data)
66 log.err(data)
67
67
68 class ProcessLauncher(object):
68 class ProcessLauncher(object):
69 """
69 """
70 Start and stop an external process in an asynchronous manner.
70 Start and stop an external process in an asynchronous manner.
71
71
72 Currently this uses deferreds to notify other parties of process state
72 Currently this uses deferreds to notify other parties of process state
73 changes. This is an awkward design and should be moved to using
73 changes. This is an awkward design and should be moved to using
74 a formal NotificationCenter.
74 a formal NotificationCenter.
75 """
75 """
76 def __init__(self, cmd_and_args):
76 def __init__(self, cmd_and_args):
77 self.cmd = cmd_and_args[0]
77 self.cmd = cmd_and_args[0]
78 self.args = cmd_and_args
78 self.args = cmd_and_args
79 self._reset()
79 self._reset()
80
80
81 def _reset(self):
81 def _reset(self):
82 self.process_protocol = None
82 self.process_protocol = None
83 self.pid = None
83 self.pid = None
84 self.start_deferred = None
84 self.start_deferred = None
85 self.stop_deferreds = []
85 self.stop_deferreds = []
86 self.state = 'before' # before, running, or after
86 self.state = 'before' # before, running, or after
87
87
88 @property
88 @property
89 def running(self):
89 def running(self):
90 if self.state == 'running':
90 if self.state == 'running':
91 return True
91 return True
92 else:
92 else:
93 return False
93 return False
94
94
95 def fire_start_deferred(self, pid):
95 def fire_start_deferred(self, pid):
96 self.pid = pid
96 self.pid = pid
97 self.state = 'running'
97 self.state = 'running'
98 log.msg('Process %r has started with pid=%i' % (self.args, pid))
98 log.msg('Process %r has started with pid=%i' % (self.args, pid))
99 self.start_deferred.callback(pid)
99 self.start_deferred.callback(pid)
100
100
101 def start(self):
101 def start(self):
102 if self.state == 'before':
102 if self.state == 'before':
103 self.process_protocol = LauncherProcessProtocol(self)
103 self.process_protocol = LauncherProcessProtocol(self)
104 self.start_deferred = defer.Deferred()
104 self.start_deferred = defer.Deferred()
105 self.process_transport = reactor.spawnProcess(
105 self.process_transport = reactor.spawnProcess(
106 self.process_protocol,
106 self.process_protocol,
107 self.cmd,
107 self.cmd,
108 self.args,
108 self.args,
109 env=os.environ
109 env=os.environ
110 )
110 )
111 return self.start_deferred
111 return self.start_deferred
112 else:
112 else:
113 s = 'the process has already been started and has state: %r' % \
113 s = 'the process has already been started and has state: %r' % \
114 self.state
114 self.state
115 return defer.fail(ProcessStateError(s))
115 return defer.fail(ProcessStateError(s))
116
116
117 def get_stop_deferred(self):
117 def get_stop_deferred(self):
118 if self.state == 'running' or self.state == 'before':
118 if self.state == 'running' or self.state == 'before':
119 d = defer.Deferred()
119 d = defer.Deferred()
120 self.stop_deferreds.append(d)
120 self.stop_deferreds.append(d)
121 return d
121 return d
122 else:
122 else:
123 s = 'this process is already complete'
123 s = 'this process is already complete'
124 return defer.fail(ProcessStateError(s))
124 return defer.fail(ProcessStateError(s))
125
125
126 def fire_stop_deferred(self, exit_code):
126 def fire_stop_deferred(self, exit_code):
127 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
127 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
128 self.state = 'after'
128 self.state = 'after'
129 for d in self.stop_deferreds:
129 for d in self.stop_deferreds:
130 d.callback(exit_code)
130 d.callback(exit_code)
131
131
132 def signal(self, sig):
132 def signal(self, sig):
133 """
133 """
134 Send a signal to the process.
134 Send a signal to the process.
135
135
136 The argument sig can be ('KILL','INT', etc.) or any signal number.
136 The argument sig can be ('KILL','INT', etc.) or any signal number.
137 """
137 """
138 if self.state == 'running':
138 if self.state == 'running':
139 self.process_transport.signalProcess(sig)
139 self.process_transport.signalProcess(sig)
140
140
141 # def __del__(self):
141 # def __del__(self):
142 # self.signal('KILL')
142 # self.signal('KILL')
143
143
144 def interrupt_then_kill(self, delay=1.0):
144 def interrupt_then_kill(self, delay=1.0):
145 self.signal('INT')
145 self.signal('INT')
146 reactor.callLater(delay, self.signal, 'KILL')
146 reactor.callLater(delay, self.signal, 'KILL')
147
147
148
148
149 class ControllerLauncher(ProcessLauncher):
149 class ControllerLauncher(ProcessLauncher):
150
150
151 def __init__(self, extra_args=None):
151 def __init__(self, extra_args=None):
152 if sys.platform == 'win32':
152 if sys.platform == 'win32':
153 args = [find_exe('ipcontroller.bat')]
153 args = [find_exe('ipcontroller.bat')]
154 else:
154 else:
155 args = ['ipcontroller']
155 args = ['ipcontroller']
156 self.extra_args = extra_args
156 self.extra_args = extra_args
157 if extra_args is not None:
157 if extra_args is not None:
158 args.extend(extra_args)
158 args.extend(extra_args)
159
159
160 ProcessLauncher.__init__(self, args)
160 ProcessLauncher.__init__(self, args)
161
161
162
162
163 class EngineLauncher(ProcessLauncher):
163 class EngineLauncher(ProcessLauncher):
164
164
165 def __init__(self, extra_args=None):
165 def __init__(self, extra_args=None):
166 if sys.platform == 'win32':
166 if sys.platform == 'win32':
167 args = [find_exe('ipengine.bat')]
167 args = [find_exe('ipengine.bat')]
168 else:
168 else:
169 args = ['ipengine']
169 args = ['ipengine']
170 self.extra_args = extra_args
170 self.extra_args = extra_args
171 if extra_args is not None:
171 if extra_args is not None:
172 args.extend(extra_args)
172 args.extend(extra_args)
173
173
174 ProcessLauncher.__init__(self, args)
174 ProcessLauncher.__init__(self, args)
175
175
176
176
177 class LocalEngineSet(object):
177 class LocalEngineSet(object):
178
178
179 def __init__(self, extra_args=None):
179 def __init__(self, extra_args=None):
180 self.extra_args = extra_args
180 self.extra_args = extra_args
181 self.launchers = []
181 self.launchers = []
182
182
183 def start(self, n):
183 def start(self, n):
184 dlist = []
184 dlist = []
185 for i in range(n):
185 for i in range(n):
186 el = EngineLauncher(extra_args=self.extra_args)
186 el = EngineLauncher(extra_args=self.extra_args)
187 d = el.start()
187 d = el.start()
188 self.launchers.append(el)
188 self.launchers.append(el)
189 dlist.append(d)
189 dlist.append(d)
190 dfinal = gatherBoth(dlist, consumeErrors=True)
190 dfinal = gatherBoth(dlist, consumeErrors=True)
191 dfinal.addCallback(self._handle_start)
191 dfinal.addCallback(self._handle_start)
192 return dfinal
192 return dfinal
193
193
194 def _handle_start(self, r):
194 def _handle_start(self, r):
195 log.msg('Engines started with pids: %r' % r)
195 log.msg('Engines started with pids: %r' % r)
196 return r
196 return r
197
197
198 def _handle_stop(self, r):
198 def _handle_stop(self, r):
199 log.msg('Engines received signal: %r' % r)
199 log.msg('Engines received signal: %r' % r)
200 return r
200 return r
201
201
202 def signal(self, sig):
202 def signal(self, sig):
203 dlist = []
203 dlist = []
204 for el in self.launchers:
204 for el in self.launchers:
205 d = el.get_stop_deferred()
205 d = el.get_stop_deferred()
206 dlist.append(d)
206 dlist.append(d)
207 el.signal(sig)
207 el.signal(sig)
208 dfinal = gatherBoth(dlist, consumeErrors=True)
208 dfinal = gatherBoth(dlist, consumeErrors=True)
209 dfinal.addCallback(self._handle_stop)
209 dfinal.addCallback(self._handle_stop)
210 return dfinal
210 return dfinal
211
211
212 def interrupt_then_kill(self, delay=1.0):
212 def interrupt_then_kill(self, delay=1.0):
213 dlist = []
213 dlist = []
214 for el in self.launchers:
214 for el in self.launchers:
215 d = el.get_stop_deferred()
215 d = el.get_stop_deferred()
216 dlist.append(d)
216 dlist.append(d)
217 el.interrupt_then_kill(delay)
217 el.interrupt_then_kill(delay)
218 dfinal = gatherBoth(dlist, consumeErrors=True)
218 dfinal = gatherBoth(dlist, consumeErrors=True)
219 dfinal.addCallback(self._handle_stop)
219 dfinal.addCallback(self._handle_stop)
220 return dfinal
220 return dfinal
221
221
222
222
223 class BatchEngineSet(object):
223 class BatchEngineSet(object):
224
224
225 # Subclasses must fill these in. See PBSEngineSet
225 # Subclasses must fill these in. See PBSEngineSet
226 submit_command = ''
226 submit_command = ''
227 delete_command = ''
227 delete_command = ''
228 job_id_regexp = ''
228 job_id_regexp = ''
229
229
230 def __init__(self, template_file, **kwargs):
230 def __init__(self, template_file, **kwargs):
231 self.template_file = template_file
231 self.template_file = template_file
232 self.context = {}
232 self.context = {}
233 self.context.update(kwargs)
233 self.context.update(kwargs)
234 self.batch_file = 'batch-script'
234 self.batch_file = self.template_file+'-run'
235
235
236 def parse_job_id(self, output):
236 def parse_job_id(self, output):
237 m = re.match(self.job_id_regexp, output)
237 m = re.match(self.job_id_regexp, output)
238 if m is not None:
238 if m is not None:
239 job_id = m.group()
239 job_id = m.group()
240 else:
240 else:
241 raise Exception("job id couldn't be determined: %s" % output)
241 raise Exception("job id couldn't be determined: %s" % output)
242 self.job_id = job_id
242 self.job_id = job_id
243 print 'Job started with job id:', job_id
243 log.msg('Job started with job id: %r' % job_id)
244 return job_id
244 return job_id
245
245
246 def write_batch_script(self, n):
246 def write_batch_script(self, n):
247 print 'n', n
248 self.context['n'] = n
247 self.context['n'] = n
249 template = open(self.template_file, 'r').read()
248 template = open(self.template_file, 'r').read()
250 print 'template', template
249 log.msg('Using template for batch script: %s' % self.template_file)
251 log.msg(template)
252 log.msg(repr(self.context))
250 log.msg(repr(self.context))
253 script_as_string = Itpl.itplns(template, self.context)
251 script_as_string = Itpl.itplns(template, self.context)
254 log.msg(script_as_string)
252 log.msg('Writing instantiated batch script: %s' % self.batch_file)
255 f = open(self.batch_file,'w')
253 f = open(self.batch_file,'w')
256 f.write(script_as_string)
254 f.write(script_as_string)
257 f.close()
255 f.close()
258
256
259 def handle_error(self, f):
257 def handle_error(self, f):
260 f.printTraceback()
258 f.printTraceback()
261 #f.raiseException()
259 f.raiseException()
262
260
263 def start(self, n):
261 def start(self, n):
264 self.write_batch_script(n)
262 self.write_batch_script(n)
265 d = getProcessOutput(self.submit_command,
263 d = getProcessOutput(self.submit_command,
266 [self.batch_file],env=os.environ)
264 [self.batch_file],env=os.environ)
267 d.addCallback(self.parse_job_id)
265 d.addCallback(self.parse_job_id)
268 d.addErrback(self.handle_error)
266 d.addErrback(self.handle_error)
269 return d
267 return d
270
268
271 def kill(self):
269 def kill(self):
272 d = getProcessOutput(self.delete_command,
270 d = getProcessOutput(self.delete_command,
273 [self.job_id],env=os.environ)
271 [self.job_id],env=os.environ)
274 return d
272 return d
275
273
276 class PBSEngineSet(BatchEngineSet):
274 class PBSEngineSet(BatchEngineSet):
277
275
278 submit_command = 'qsub'
276 submit_command = 'qsub'
279 delete_command = 'qdel'
277 delete_command = 'qdel'
280 job_id_regexp = '\d+'
278 job_id_regexp = '\d+'
281
279
282 def __init__(self, template_file, **kwargs):
280 def __init__(self, template_file, **kwargs):
283 BatchEngineSet.__init__(self, template_file, **kwargs)
281 BatchEngineSet.__init__(self, template_file, **kwargs)
284
282
285
283
286 def main_local(args):
284 def main_local(args):
287 cont_args = []
285 cont_args = []
288 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
286 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
289 if args.x:
287 if args.x:
290 cont_args.append('-x')
288 cont_args.append('-x')
291 if args.y:
289 if args.y:
292 cont_args.append('-y')
290 cont_args.append('-y')
293 cl = ControllerLauncher(extra_args=cont_args)
291 cl = ControllerLauncher(extra_args=cont_args)
294 dstart = cl.start()
292 dstart = cl.start()
295 def start_engines(cont_pid):
293 def start_engines(cont_pid):
296 engine_args = []
294 engine_args = []
297 engine_args.append('--logfile=%s' % \
295 engine_args.append('--logfile=%s' % \
298 pjoin(args.logdir,'ipengine%s-' % cont_pid))
296 pjoin(args.logdir,'ipengine%s-' % cont_pid))
299 eset = LocalEngineSet(extra_args=engine_args)
297 eset = LocalEngineSet(extra_args=engine_args)
300 def shutdown(signum, frame):
298 def shutdown(signum, frame):
301 log.msg('Stopping local cluster')
299 log.msg('Stopping local cluster')
302 # We are still playing with the times here, but these seem
300 # We are still playing with the times here, but these seem
303 # to be reliable in allowing everything to exit cleanly.
301 # to be reliable in allowing everything to exit cleanly.
304 eset.interrupt_then_kill(0.5)
302 eset.interrupt_then_kill(0.5)
305 cl.interrupt_then_kill(0.5)
303 cl.interrupt_then_kill(0.5)
306 reactor.callLater(1.0, reactor.stop)
304 reactor.callLater(1.0, reactor.stop)
307 signal.signal(signal.SIGINT,shutdown)
305 signal.signal(signal.SIGINT,shutdown)
308 d = eset.start(args.n)
306 d = eset.start(args.n)
309 return d
307 return d
310 def delay_start(cont_pid):
308 def delay_start(cont_pid):
311 # This is needed because the controller doesn't start listening
309 # This is needed because the controller doesn't start listening
312 # right when it starts and the controller needs to write
310 # right when it starts and the controller needs to write
313 # furl files for the engine to pick up
311 # furl files for the engine to pick up
314 reactor.callLater(1.0, start_engines, cont_pid)
312 reactor.callLater(1.0, start_engines, cont_pid)
315 dstart.addCallback(delay_start)
313 dstart.addCallback(delay_start)
316 dstart.addErrback(lambda f: f.raiseException())
314 dstart.addErrback(lambda f: f.raiseException())
317
315
318 def main_mpirun(args):
316 def main_mpirun(args):
319 cont_args = []
317 cont_args = []
320 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
318 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
321 if args.x:
319 if args.x:
322 cont_args.append('-x')
320 cont_args.append('-x')
323 if args.y:
321 if args.y:
324 cont_args.append('-y')
322 cont_args.append('-y')
325 cl = ControllerLauncher(extra_args=cont_args)
323 cl = ControllerLauncher(extra_args=cont_args)
326 dstart = cl.start()
324 dstart = cl.start()
327 def start_engines(cont_pid):
325 def start_engines(cont_pid):
328 raw_args = ['mpirun']
326 raw_args = ['mpirun']
329 raw_args.extend(['-n',str(args.n)])
327 raw_args.extend(['-n',str(args.n)])
330 raw_args.append('ipengine')
328 raw_args.append('ipengine')
331 raw_args.append('-l')
329 raw_args.append('-l')
332 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
330 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
333 raw_args.append('--mpi=%s' % args.mpi)
331 raw_args.append('--mpi=%s' % args.mpi)
334 eset = ProcessLauncher(raw_args)
332 eset = ProcessLauncher(raw_args)
335 def shutdown(signum, frame):
333 def shutdown(signum, frame):
336 log.msg('Stopping local cluster')
334 log.msg('Stopping local cluster')
337 # We are still playing with the times here, but these seem
335 # We are still playing with the times here, but these seem
338 # to be reliable in allowing everything to exit cleanly.
336 # to be reliable in allowing everything to exit cleanly.
339 eset.interrupt_then_kill(1.0)
337 eset.interrupt_then_kill(1.0)
340 cl.interrupt_then_kill(1.0)
338 cl.interrupt_then_kill(1.0)
341 reactor.callLater(2.0, reactor.stop)
339 reactor.callLater(2.0, reactor.stop)
342 signal.signal(signal.SIGINT,shutdown)
340 signal.signal(signal.SIGINT,shutdown)
343 d = eset.start()
341 d = eset.start()
344 return d
342 return d
345 def delay_start(cont_pid):
343 def delay_start(cont_pid):
346 # This is needed because the controller doesn't start listening
344 # This is needed because the controller doesn't start listening
347 # right when it starts and the controller needs to write
345 # right when it starts and the controller needs to write
348 # furl files for the engine to pick up
346 # furl files for the engine to pick up
349 reactor.callLater(1.0, start_engines, cont_pid)
347 reactor.callLater(1.0, start_engines, cont_pid)
350 dstart.addCallback(delay_start)
348 dstart.addCallback(delay_start)
351 dstart.addErrback(lambda f: f.raiseException())
349 dstart.addErrback(lambda f: f.raiseException())
352
350
353 def main_pbs(args):
351 def main_pbs(args):
354 cont_args = []
352 cont_args = []
355 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
353 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
356 if args.x:
354 if args.x:
357 cont_args.append('-x')
355 cont_args.append('-x')
358 if args.y:
356 if args.y:
359 cont_args.append('-y')
357 cont_args.append('-y')
360 cl = ControllerLauncher(extra_args=cont_args)
358 cl = ControllerLauncher(extra_args=cont_args)
361 dstart = cl.start()
359 dstart = cl.start()
362 def start_engines(r):
360 def start_engines(r):
363 pbs_set = PBSEngineSet(args.pbsscript)
361 pbs_set = PBSEngineSet(args.pbsscript)
362 def shutdown(signum, frame):
363 log.msg('Stopping pbs cluster')
364 d = pbs_set.kill()
365 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
366 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
367 signal.signal(signal.SIGINT,shutdown)
364 d = pbs_set.start(args.n)
368 d = pbs_set.start(args.n)
365 return d
369 return d
366 dstart.addCallback(start_engines)
370 dstart.addCallback(start_engines)
367 dstart.addErrback(lambda f: f.raiseException())
371 dstart.addErrback(lambda f: f.raiseException())
368
372
369
373
370 def get_args():
374 def get_args():
371 base_parser = argparse.ArgumentParser(add_help=False)
375 base_parser = argparse.ArgumentParser(add_help=False)
372 base_parser.add_argument(
376 base_parser.add_argument(
373 '-x',
377 '-x',
374 action='store_true',
378 action='store_true',
375 dest='x',
379 dest='x',
376 help='turn off client security'
380 help='turn off client security'
377 )
381 )
378 base_parser.add_argument(
382 base_parser.add_argument(
379 '-y',
383 '-y',
380 action='store_true',
384 action='store_true',
381 dest='y',
385 dest='y',
382 help='turn off engine security'
386 help='turn off engine security'
383 )
387 )
384 base_parser.add_argument(
388 base_parser.add_argument(
385 "--logdir",
389 "--logdir",
386 type=str,
390 type=str,
387 dest="logdir",
391 dest="logdir",
388 help="directory to put log files (default=$IPYTHONDIR/log)",
392 help="directory to put log files (default=$IPYTHONDIR/log)",
389 default=pjoin(get_ipython_dir(),'log')
393 default=pjoin(get_ipython_dir(),'log')
390 )
394 )
391 base_parser.add_argument(
395 base_parser.add_argument(
392 "-n",
396 "-n",
393 "--num",
397 "--num",
394 type=int,
398 type=int,
395 dest="n",
399 dest="n",
396 default=2,
400 default=2,
397 help="the number of engines to start"
401 help="the number of engines to start"
398 )
402 )
399
403
400 parser = argparse.ArgumentParser(
404 parser = argparse.ArgumentParser(
401 description='IPython cluster startup. This starts a controller and\
405 description='IPython cluster startup. This starts a controller and\
402 engines using various approaches'
406 engines using various approaches'
403 )
407 )
404 subparsers = parser.add_subparsers(
408 subparsers = parser.add_subparsers(
405 help='available cluster types. For help, do "ipcluster TYPE --help"')
409 help='available cluster types. For help, do "ipcluster TYPE --help"')
406
410
407 parser_local = subparsers.add_parser(
411 parser_local = subparsers.add_parser(
408 'local',
412 'local',
409 help='run a local cluster',
413 help='run a local cluster',
410 parents=[base_parser]
414 parents=[base_parser]
411 )
415 )
412 parser_local.set_defaults(func=main_local)
416 parser_local.set_defaults(func=main_local)
413
417
414 parser_mpirun = subparsers.add_parser(
418 parser_mpirun = subparsers.add_parser(
415 'mpirun',
419 'mpirun',
416 help='run a cluster using mpirun',
420 help='run a cluster using mpirun',
417 parents=[base_parser]
421 parents=[base_parser]
418 )
422 )
419 parser_mpirun.add_argument(
423 parser_mpirun.add_argument(
420 "--mpi",
424 "--mpi",
421 type=str,
425 type=str,
422 dest="mpi",
426 dest="mpi",
423 default='mpi4py',
427 default='mpi4py',
424 help="how to call MPI_Init (default=mpi4py)"
428 help="how to call MPI_Init (default=mpi4py)"
425 )
429 )
426 parser_mpirun.set_defaults(func=main_mpirun)
430 parser_mpirun.set_defaults(func=main_mpirun)
427
431
428 parser_pbs = subparsers.add_parser(
432 parser_pbs = subparsers.add_parser(
429 'pbs',
433 'pbs',
430 help='run a pbs cluster',
434 help='run a pbs cluster',
431 parents=[base_parser]
435 parents=[base_parser]
432 )
436 )
433 parser_pbs.add_argument(
437 parser_pbs.add_argument(
434 '--pbs-script',
438 '--pbs-script',
435 type=str,
439 type=str,
436 dest='pbsscript',
440 dest='pbsscript',
437 help='PBS script template',
441 help='PBS script template',
438 default='pbs.template'
442 default='pbs.template'
439 )
443 )
440 parser_pbs.set_defaults(func=main_pbs)
444 parser_pbs.set_defaults(func=main_pbs)
441 args = parser.parse_args()
445 args = parser.parse_args()
442 return args
446 return args
443
447
444 def main():
448 def main():
445 args = get_args()
449 args = get_args()
446 reactor.callWhenRunning(args.func, args)
450 reactor.callWhenRunning(args.func, args)
447 log.startLogging(sys.stdout)
451 log.startLogging(sys.stdout)
448 reactor.run()
452 reactor.run()
449
453
450 if __name__ == '__main__':
454 if __name__ == '__main__':
451 main()
455 main()
General Comments 0
You need to be logged in to leave comments. Login now