##// END OF EJS Templates
The local mode of ipcluster now works on Win32.
Brian Granger -
Show More
@@ -1,381 +1,435 b''
1 import os
1 import os
2 import re
2 import re
3 import sys
3 import sys
4 import signal
4 import signal
5 pjoin = os.path.join
5 pjoin = os.path.join
6
6
7 from twisted.internet import reactor, defer
7 from twisted.internet import reactor, defer
8 from twisted.internet.protocol import ProcessProtocol
8 from twisted.internet.protocol import ProcessProtocol
9 from twisted.python import failure, log
9 from twisted.python import failure, log
10 from twisted.internet.error import ProcessDone, ProcessTerminated
10 from twisted.internet.error import ProcessDone, ProcessTerminated
11 from twisted.internet.utils import getProcessOutput
11 from twisted.internet.utils import getProcessOutput
12
12
13 from IPython.external import argparse
13 from IPython.external import argparse
14 from IPython.external import Itpl
14 from IPython.external import Itpl
15 from IPython.kernel.twistedutil import gatherBoth
15 from IPython.kernel.twistedutil import gatherBoth
16 from IPython.kernel.util import printer
16 from IPython.kernel.util import printer
17 from IPython.genutils import get_ipython_dir
17 from IPython.genutils import get_ipython_dir, num_cpus
18
18
19 def find_exe(cmd):
20 try:
21 import win32api
22 except ImportError:
23 raise ImportError('you need to have pywin32 installed for this to work')
24 else:
25 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd)
26 return path
19
27
20 # Test local cluster on Win32
28 # Test local cluster on Win32
21 # Look at local cluster usage strings
29 # Look at local cluster usage strings
22 # PBS stuff
30 # PBS stuff
23
31
24 class ProcessStateError(Exception):
32 class ProcessStateError(Exception):
25 pass
33 pass
26
34
27 class UnknownStatus(Exception):
35 class UnknownStatus(Exception):
28 pass
36 pass
29
37
30 class LauncherProcessProtocol(ProcessProtocol):
38 class LauncherProcessProtocol(ProcessProtocol):
31 """
39 """
32 A ProcessProtocol to go with the ProcessLauncher.
40 A ProcessProtocol to go with the ProcessLauncher.
33 """
41 """
34 def __init__(self, process_launcher):
42 def __init__(self, process_launcher):
35 self.process_launcher = process_launcher
43 self.process_launcher = process_launcher
36
44
37 def connectionMade(self):
45 def connectionMade(self):
38 self.process_launcher.fire_start_deferred(self.transport.pid)
46 self.process_launcher.fire_start_deferred(self.transport.pid)
39
47
40 def processEnded(self, status):
48 def processEnded(self, status):
41 value = status.value
49 value = status.value
42 if isinstance(value, ProcessDone):
50 if isinstance(value, ProcessDone):
43 self.process_launcher.fire_stop_deferred(0)
51 self.process_launcher.fire_stop_deferred(0)
44 elif isinstance(value, ProcessTerminated):
52 elif isinstance(value, ProcessTerminated):
45 self.process_launcher.fire_stop_deferred(
53 self.process_launcher.fire_stop_deferred(
46 {'exit_code':value.exitCode,
54 {'exit_code':value.exitCode,
47 'signal':value.signal,
55 'signal':value.signal,
48 'status':value.status
56 'status':value.status
49 }
57 }
50 )
58 )
51 else:
59 else:
52 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
60 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
53
61
54 def outReceived(self, data):
62 def outReceived(self, data):
55 log.msg(data)
63 log.msg(data)
56
64
57 def errReceived(self, data):
65 def errReceived(self, data):
58 log.err(data)
66 log.err(data)
59
67
60 class ProcessLauncher(object):
68 class ProcessLauncher(object):
61 """
69 """
62 Start and stop an external process in an asynchronous manner.
70 Start and stop an external process in an asynchronous manner.
63
71
64 Currently this uses deferreds to notify other parties of process state
72 Currently this uses deferreds to notify other parties of process state
65 changes. This is an awkward design and should be moved to using
73 changes. This is an awkward design and should be moved to using
66 a formal NotificationCenter.
74 a formal NotificationCenter.
67 """
75 """
68 def __init__(self, cmd_and_args):
76 def __init__(self, cmd_and_args):
69
70 self.cmd = cmd_and_args[0]
77 self.cmd = cmd_and_args[0]
71 self.args = cmd_and_args
78 self.args = cmd_and_args
72 self._reset()
79 self._reset()
73
80
74 def _reset(self):
81 def _reset(self):
75 self.process_protocol = None
82 self.process_protocol = None
76 self.pid = None
83 self.pid = None
77 self.start_deferred = None
84 self.start_deferred = None
78 self.stop_deferreds = []
85 self.stop_deferreds = []
79 self.state = 'before' # before, running, or after
86 self.state = 'before' # before, running, or after
80
87
81 @property
88 @property
82 def running(self):
89 def running(self):
83 if self.state == 'running':
90 if self.state == 'running':
84 return True
91 return True
85 else:
92 else:
86 return False
93 return False
87
94
88 def fire_start_deferred(self, pid):
95 def fire_start_deferred(self, pid):
89 self.pid = pid
96 self.pid = pid
90 self.state = 'running'
97 self.state = 'running'
91 log.msg('Process %r has started with pid=%i' % (self.args, pid))
98 log.msg('Process %r has started with pid=%i' % (self.args, pid))
92 self.start_deferred.callback(pid)
99 self.start_deferred.callback(pid)
93
100
94 def start(self):
101 def start(self):
95 if self.state == 'before':
102 if self.state == 'before':
96 self.process_protocol = LauncherProcessProtocol(self)
103 self.process_protocol = LauncherProcessProtocol(self)
97 self.start_deferred = defer.Deferred()
104 self.start_deferred = defer.Deferred()
98 self.process_transport = reactor.spawnProcess(
105 self.process_transport = reactor.spawnProcess(
99 self.process_protocol,
106 self.process_protocol,
100 self.cmd,
107 self.cmd,
101 self.args,
108 self.args,
102 env=os.environ
109 env=os.environ
103 )
110 )
104 return self.start_deferred
111 return self.start_deferred
105 else:
112 else:
106 s = 'the process has already been started and has state: %r' % \
113 s = 'the process has already been started and has state: %r' % \
107 self.state
114 self.state
108 return defer.fail(ProcessStateError(s))
115 return defer.fail(ProcessStateError(s))
109
116
110 def get_stop_deferred(self):
117 def get_stop_deferred(self):
111 if self.state == 'running' or self.state == 'before':
118 if self.state == 'running' or self.state == 'before':
112 d = defer.Deferred()
119 d = defer.Deferred()
113 self.stop_deferreds.append(d)
120 self.stop_deferreds.append(d)
114 return d
121 return d
115 else:
122 else:
116 s = 'this process is already complete'
123 s = 'this process is already complete'
117 return defer.fail(ProcessStateError(s))
124 return defer.fail(ProcessStateError(s))
118
125
119 def fire_stop_deferred(self, exit_code):
126 def fire_stop_deferred(self, exit_code):
120 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
127 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
121 self.state = 'after'
128 self.state = 'after'
122 for d in self.stop_deferreds:
129 for d in self.stop_deferreds:
123 d.callback(exit_code)
130 d.callback(exit_code)
124
131
125 def signal(self, sig):
132 def signal(self, sig):
126 """
133 """
127 Send a signal to the process.
134 Send a signal to the process.
128
135
129 The argument sig can be ('KILL','INT', etc.) or any signal number.
136 The argument sig can be ('KILL','INT', etc.) or any signal number.
130 """
137 """
131 if self.state == 'running':
138 if self.state == 'running':
132 self.process_transport.signalProcess(sig)
139 self.process_transport.signalProcess(sig)
133
140
134 def __del__(self):
141 # def __del__(self):
135 self.signal('KILL')
142 # self.signal('KILL')
136
143
137 def interrupt_then_kill(self, delay=1.0):
144 def interrupt_then_kill(self, delay=1.0):
138 self.signal('INT')
145 self.signal('INT')
139 reactor.callLater(delay, self.signal, 'KILL')
146 reactor.callLater(delay, self.signal, 'KILL')
140
147
141
148
142 class ControllerLauncher(ProcessLauncher):
149 class ControllerLauncher(ProcessLauncher):
143
150
144 def __init__(self, extra_args=None):
151 def __init__(self, extra_args=None):
145 self.args = ['ipcontroller']
152 if sys.platform == 'win32':
153 args = [find_exe('ipcontroller.bat')]
154 else:
155 args = ['ipcontroller']
146 self.extra_args = extra_args
156 self.extra_args = extra_args
147 if extra_args is not None:
157 if extra_args is not None:
148 self.args.extend(extra_args)
158 args.extend(extra_args)
149
159
150 ProcessLauncher.__init__(self, self.args)
160 ProcessLauncher.__init__(self, args)
151
161
152
162
153 class EngineLauncher(ProcessLauncher):
163 class EngineLauncher(ProcessLauncher):
154
164
155 def __init__(self, extra_args=None):
165 def __init__(self, extra_args=None):
156 self.args = ['ipengine']
166 if sys.platform == 'win32':
167 args = [find_exe('ipengine.bat')]
168 else:
169 args = ['ipengine']
157 self.extra_args = extra_args
170 self.extra_args = extra_args
158 if extra_args is not None:
171 if extra_args is not None:
159 self.args.extend(extra_args)
172 args.extend(extra_args)
160
173
161 ProcessLauncher.__init__(self, self.args)
174 ProcessLauncher.__init__(self, args)
162
175
163
176
164 class LocalEngineSet(object):
177 class LocalEngineSet(object):
165
178
166 def __init__(self, extra_args=None):
179 def __init__(self, extra_args=None):
167 self.extra_args = extra_args
180 self.extra_args = extra_args
168 self.launchers = []
181 self.launchers = []
169
182
170 def start(self, n):
183 def start(self, n):
171 dlist = []
184 dlist = []
172 for i in range(n):
185 for i in range(n):
173 el = EngineLauncher(extra_args=self.extra_args)
186 el = EngineLauncher(extra_args=self.extra_args)
174 d = el.start()
187 d = el.start()
175 self.launchers.append(el)
188 self.launchers.append(el)
176 dlist.append(d)
189 dlist.append(d)
177 dfinal = gatherBoth(dlist, consumeErrors=True)
190 dfinal = gatherBoth(dlist, consumeErrors=True)
178 dfinal.addCallback(self._handle_start)
191 dfinal.addCallback(self._handle_start)
179 return dfinal
192 return dfinal
180
193
181 def _handle_start(self, r):
194 def _handle_start(self, r):
182 log.msg('Engines started with pids: %r' % r)
195 log.msg('Engines started with pids: %r' % r)
183 return r
196 return r
184
197
185 def _handle_stop(self, r):
198 def _handle_stop(self, r):
186 log.msg('Engines received signal: %r' % r)
199 log.msg('Engines received signal: %r' % r)
187 return r
200 return r
188
201
189 def signal(self, sig):
202 def signal(self, sig):
190 dlist = []
203 dlist = []
191 for el in self.launchers:
204 for el in self.launchers:
192 d = el.get_stop_deferred()
205 d = el.get_stop_deferred()
193 dlist.append(d)
206 dlist.append(d)
194 el.signal(sig)
207 el.signal(sig)
195 dfinal = gatherBoth(dlist, consumeErrors=True)
208 dfinal = gatherBoth(dlist, consumeErrors=True)
196 dfinal.addCallback(self._handle_stop)
209 dfinal.addCallback(self._handle_stop)
197 return dfinal
210 return dfinal
198
211
199 def interrupt_then_kill(self, delay=1.0):
212 def interrupt_then_kill(self, delay=1.0):
200 dlist = []
213 dlist = []
201 for el in self.launchers:
214 for el in self.launchers:
202 d = el.get_stop_deferred()
215 d = el.get_stop_deferred()
203 dlist.append(d)
216 dlist.append(d)
204 el.interrupt_then_kill(delay)
217 el.interrupt_then_kill(delay)
205 dfinal = gatherBoth(dlist, consumeErrors=True)
218 dfinal = gatherBoth(dlist, consumeErrors=True)
206 dfinal.addCallback(self._handle_stop)
219 dfinal.addCallback(self._handle_stop)
207 return dfinal
220 return dfinal
208
221
209
222
210 class BatchEngineSet(object):
223 class BatchEngineSet(object):
211
224
212 # Subclasses must fill these in. See PBSEngineSet
225 # Subclasses must fill these in. See PBSEngineSet
213 submit_command = ''
226 submit_command = ''
214 delete_command = ''
227 delete_command = ''
215 job_id_regexp = ''
228 job_id_regexp = ''
216
229
217 def __init__(self, template_file, **kwargs):
230 def __init__(self, template_file, **kwargs):
218 self.template_file = template_file
231 self.template_file = template_file
219 self.context = {}
232 self.context = {}
220 self.context.update(kwargs)
233 self.context.update(kwargs)
221 self.batch_file = 'batch-script'
234 self.batch_file = 'batch-script'
222
235
223 def parse_job_id(self, output):
236 def parse_job_id(self, output):
224 m = re.match(self.job_id_regexp, output)
237 m = re.match(self.job_id_regexp, output)
225 if m is not None:
238 if m is not None:
226 job_id = m.group()
239 job_id = m.group()
227 else:
240 else:
228 raise Exception("job id couldn't be determined: %s" % output)
241 raise Exception("job id couldn't be determined: %s" % output)
229 self.job_id = job_id
242 self.job_id = job_id
230 print 'Job started with job id:', job_id
243 print 'Job started with job id:', job_id
231 return job_id
244 return job_id
232
245
233 def write_batch_script(self, n):
246 def write_batch_script(self, n):
234 print 'n', n
247 print 'n', n
235 self.context['n'] = n
248 self.context['n'] = n
236 template = open(self.template_file, 'r').read()
249 template = open(self.template_file, 'r').read()
237 print 'template', template
250 print 'template', template
238 script_as_string = Itpl.itplns(template, self.context)
251 script_as_string = Itpl.itplns(template, self.context)
239 print 'script', script_as_string
252 print 'script', script_as_string
240 f = open(self.batch_file,'w')
253 f = open(self.batch_file,'w')
241 f.write(script_as_string)
254 f.write(script_as_string)
242 f.close()
255 f.close()
243
256
244 def handle_error(self, f):
257 def handle_error(self, f):
245 f.printTraceback()
258 f.printTraceback()
246 f.raiseException()
259 f.raiseException()
247
260
248 def start(self, n):
261 def start(self, n):
249 self.write_batch_script(n)
262 self.write_batch_script(n)
250 d = getProcessOutput(self.submit_command,
263 d = getProcessOutput(self.submit_command,
251 [self.batch_file],env=os.environ)
264 [self.batch_file],env=os.environ)
252 d.addCallback(self.parse_job_id)
265 d.addCallback(self.parse_job_id)
253 #d.addErrback(self.handle_error)
266 #d.addErrback(self.handle_error)
254 return d
267 return d
255
268
256 def kill(self):
269 def kill(self):
257 d = getProcessOutput(self.delete_command,
270 d = getProcessOutput(self.delete_command,
258 [self.job_id],env=os.environ)
271 [self.job_id],env=os.environ)
259 return d
272 return d
260
273
261 class PBSEngineSet(BatchEngineSet):
274 class PBSEngineSet(BatchEngineSet):
262
275
263 submit_command = 'qsub'
276 submit_command = 'qsub'
264 delete_command = 'qdel'
277 delete_command = 'qdel'
265 job_id_regexp = '\d+'
278 job_id_regexp = '\d+'
266
279
267 def __init__(self, template_file, **kwargs):
280 def __init__(self, template_file, **kwargs):
268 BatchEngineSet.__init__(self, template_file, **kwargs)
281 BatchEngineSet.__init__(self, template_file, **kwargs)
269
282
270
283
271 def main_local(args):
284 def main_local(args):
272 cont_args = []
285 cont_args = []
273 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
286 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
287 if args.x:
288 cont_args.append('-x')
289 if args.y:
290 cont_args.append('-y')
274 cl = ControllerLauncher(extra_args=cont_args)
291 cl = ControllerLauncher(extra_args=cont_args)
275 dstart = cl.start()
292 dstart = cl.start()
276 def start_engines(cont_pid):
293 def start_engines(cont_pid):
277 engine_args = []
294 engine_args = []
278 engine_args.append('--logfile=%s' % \
295 engine_args.append('--logfile=%s' % \
279 pjoin(args.logdir,'ipengine%s-' % cont_pid))
296 pjoin(args.logdir,'ipengine%s-' % cont_pid))
280 eset = LocalEngineSet(extra_args=engine_args)
297 eset = LocalEngineSet(extra_args=engine_args)
281 def shutdown(signum, frame):
298 def shutdown(signum, frame):
282 log.msg('Stopping local cluster')
299 log.msg('Stopping local cluster')
283 # We are still playing with the times here, but these seem
300 # We are still playing with the times here, but these seem
284 # to be reliable in allowing everything to exit cleanly.
301 # to be reliable in allowing everything to exit cleanly.
285 eset.interrupt_then_kill(0.5)
302 eset.interrupt_then_kill(0.5)
286 cl.interrupt_then_kill(0.5)
303 cl.interrupt_then_kill(0.5)
287 reactor.callLater(1.0, reactor.stop)
304 reactor.callLater(1.0, reactor.stop)
288 signal.signal(signal.SIGINT,shutdown)
305 signal.signal(signal.SIGINT,shutdown)
289 d = eset.start(args.n)
306 d = eset.start(args.n)
290 return d
307 return d
291 def delay_start(cont_pid):
308 def delay_start(cont_pid):
292 # This is needed because the controller doesn't start listening
309 # This is needed because the controller doesn't start listening
293 # right when it starts and the controller needs to write
310 # right when it starts and the controller needs to write
294 # furl files for the engine to pick up
311 # furl files for the engine to pick up
295 reactor.callLater(1.0, start_engines, cont_pid)
312 reactor.callLater(1.0, start_engines, cont_pid)
296 dstart.addCallback(delay_start)
313 dstart.addCallback(delay_start)
297 dstart.addErrback(lambda f: f.raiseException())
314 dstart.addErrback(lambda f: f.raiseException())
298
315
299 def main_mpirun(args):
316 def main_mpirun(args):
300 cont_args = []
317 cont_args = []
301 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
318 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
319 if args.x:
320 cont_args.append('-x')
321 if args.y:
322 cont_args.append('-y')
302 cl = ControllerLauncher(extra_args=cont_args)
323 cl = ControllerLauncher(extra_args=cont_args)
303 dstart = cl.start()
324 dstart = cl.start()
304 def start_engines(cont_pid):
325 def start_engines(cont_pid):
305 raw_args = ['mpirun']
326 raw_args = ['mpirun']
306 raw_args.extend(['-n',str(args.n)])
327 raw_args.extend(['-n',str(args.n)])
307 raw_args.append('ipengine')
328 raw_args.append('ipengine')
308 raw_args.append('-l')
329 raw_args.append('-l')
309 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
330 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
310 raw_args.append('--mpi=%s' % args.mpi)
331 raw_args.append('--mpi=%s' % args.mpi)
311 eset = ProcessLauncher(raw_args)
332 eset = ProcessLauncher(raw_args)
312 def shutdown(signum, frame):
333 def shutdown(signum, frame):
313 log.msg('Stopping local cluster')
334 log.msg('Stopping local cluster')
314 # We are still playing with the times here, but these seem
335 # We are still playing with the times here, but these seem
315 # to be reliable in allowing everything to exit cleanly.
336 # to be reliable in allowing everything to exit cleanly.
316 eset.interrupt_then_kill(1.0)
337 eset.interrupt_then_kill(1.0)
317 cl.interrupt_then_kill(1.0)
338 cl.interrupt_then_kill(1.0)
318 reactor.callLater(2.0, reactor.stop)
339 reactor.callLater(2.0, reactor.stop)
319 signal.signal(signal.SIGINT,shutdown)
340 signal.signal(signal.SIGINT,shutdown)
320 d = eset.start()
341 d = eset.start()
321 return d
342 return d
322 def delay_start(cont_pid):
343 def delay_start(cont_pid):
323 # This is needed because the controller doesn't start listening
344 # This is needed because the controller doesn't start listening
324 # right when it starts and the controller needs to write
345 # right when it starts and the controller needs to write
325 # furl files for the engine to pick up
346 # furl files for the engine to pick up
326 reactor.callLater(1.0, start_engines, cont_pid)
347 reactor.callLater(1.0, start_engines, cont_pid)
327 dstart.addCallback(delay_start)
348 dstart.addCallback(delay_start)
328 dstart.addErrback(lambda f: f.raiseException())
349 dstart.addErrback(lambda f: f.raiseException())
329
350
330 def main_pbs(args):
351 def main_pbs(args):
331 cl = ControllerLauncher()
352 cl = ControllerLauncher()
332 dstart = cl.start()
353 dstart = cl.start()
333 def start_engines(r):
354 def start_engines(r):
334 pbs_set = PBSEngineSet('pbs.template')
355 pbs_set = PBSEngineSet('pbs.template')
335 print pbs_set.template_file
356 print pbs_set.template_file
336 d = pbs_set.start(args.n)
357 d = pbs_set.start(args.n)
337 return d
358 return d
338 dstart.addCallback(start_engines)
359 dstart.addCallback(start_engines)
339 dstart.addErrback(lambda f: f.printTraceback())
360 dstart.addErrback(lambda f: f.printTraceback())
340
361
341
362
342 def get_args():
363 def get_args():
343 parser = argparse.ArgumentParser(
364 base_parser = argparse.ArgumentParser(add_help=False)
344 description='IPython cluster startup')
365 base_parser.add_argument(
345 newopt = parser.add_argument # shorthand
366 '-x',
367 action='store_true',
368 dest='x',
369 help='turn off client security'
370 )
371 base_parser.add_argument(
372 '-y',
373 action='store_true',
374 dest='y',
375 help='turn off engine security'
376 )
377 base_parser.add_argument(
378 "--logdir",
379 type=str,
380 dest="logdir",
381 help="directory to put log files (default=$IPYTHONDIR/log)",
382 default=pjoin(get_ipython_dir(),'log')
383 )
384 base_parser.add_argument(
385 "-n",
386 "--num",
387 type=int,
388 dest="n",
389 default=2,
390 help="the number of engines to start"
391 )
346
392
347 subparsers = parser.add_subparsers(help='sub-command help')
393 parser = argparse.ArgumentParser(
394 description='IPython cluster startup. This starts a controller and\
395 engines using various approaches'
396 )
397 subparsers = parser.add_subparsers(
398 help='available cluster types. For help, do "ipcluster TYPE --help"')
348
399
349 parser_local = subparsers.add_parser('local', help='run a local cluster')
400 parser_local = subparsers.add_parser(
350 parser_local.add_argument("--logdir", type=str, dest="logdir",
401 'local',
351 help="directory to put log files (default=$IPYTHONDIR/log)",
402 help='run a local cluster',
352 default=pjoin(get_ipython_dir(),'log'))
403 parents=[base_parser]
353 parser_local.add_argument("-n", "--num", type=int, dest="n",default=2,
404 )
354 help="the number of engines to start")
355 parser_local.set_defaults(func=main_local)
405 parser_local.set_defaults(func=main_local)
356
406
357 parser_local = subparsers.add_parser('mpirun', help='run a cluster using mpirun')
407 parser_mpirun = subparsers.add_parser(
358 parser_local.add_argument("--logdir", type=str, dest="logdir",
408 'mpirun',
359 help="directory to put log files (default=$IPYTHONDIR/log)",
409 help='run a cluster using mpirun',
360 default=pjoin(get_ipython_dir(),'log'))
410 parents=[base_parser]
361 parser_local.add_argument("-n", "--num", type=int, dest="n",default=2,
411 )
362 help="the number of engines to start")
412 parser_mpirun.add_argument(
363 parser_local.add_argument("--mpi", type=str, dest="mpi",default='mpi4py',
413 "--mpi",
364 help="how to call MPI_Init (default=mpi4py)")
414 type=str,
365 parser_local.set_defaults(func=main_mpirun)
415 dest="mpi",
416 default='mpi4py',
417 help="how to call MPI_Init (default=mpi4py)"
418 )
419 parser_mpirun.set_defaults(func=main_mpirun)
366
420
367 parser_pbs = subparsers.add_parser('pbs', help='run a pbs cluster')
421 parser_pbs = subparsers.add_parser('pbs', help='run a pbs cluster')
368 parser_pbs.add_argument('--pbs-script', type=str, dest='pbsscript',
422 parser_pbs.add_argument('--pbs-script', type=str, dest='pbsscript',
369 help='PBS script template')
423 help='PBS script template')
370 parser_pbs.set_defaults(func=main_pbs)
424 parser_pbs.set_defaults(func=main_pbs)
371 args = parser.parse_args()
425 args = parser.parse_args()
372 return args
426 return args
373
427
374 def main():
428 def main():
375 args = get_args()
429 args = get_args()
376 reactor.callWhenRunning(args.func, args)
430 reactor.callWhenRunning(args.func, args)
377 log.startLogging(sys.stdout)
431 log.startLogging(sys.stdout)
378 reactor.run()
432 reactor.run()
379
433
380 if __name__ == '__main__':
434 if __name__ == '__main__':
381 main()
435 main()
General Comments 0
You need to be logged in to leave comments. Login now