##// END OF EJS Templates
Fixing command finding bug on Win32 in ipcluster.py
Administrator -
Show More
@@ -1,486 +1,489 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 from twisted.internet import reactor, defer
23 from twisted.internet import reactor, defer
24 from twisted.internet.protocol import ProcessProtocol
24 from twisted.internet.protocol import ProcessProtocol
25 from twisted.python import failure, log
25 from twisted.python import failure, log
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28
28
29 from IPython.external import argparse
29 from IPython.external import argparse
30 from IPython.external import Itpl
30 from IPython.external import Itpl
31 from IPython.kernel.twistedutil import gatherBoth
31 from IPython.kernel.twistedutil import gatherBoth
32 from IPython.kernel.util import printer
32 from IPython.kernel.util import printer
33 from IPython.genutils import get_ipython_dir, num_cpus
33 from IPython.genutils import get_ipython_dir, num_cpus
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # General process handling code
36 # General process handling code
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39 def find_exe(cmd):
39 def find_exe(cmd):
40 try:
40 try:
41 import win32api
41 import win32api
42 except ImportError:
42 except ImportError:
43 raise ImportError('you need to have pywin32 installed for this to work')
43 raise ImportError('you need to have pywin32 installed for this to work')
44 else:
44 else:
45 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd)
45 try:
46 return path
46 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
47 except:
48 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
49 return path
47
50
48 class ProcessStateError(Exception):
51 class ProcessStateError(Exception):
49 pass
52 pass
50
53
51 class UnknownStatus(Exception):
54 class UnknownStatus(Exception):
52 pass
55 pass
53
56
54 class LauncherProcessProtocol(ProcessProtocol):
57 class LauncherProcessProtocol(ProcessProtocol):
55 """
58 """
56 A ProcessProtocol to go with the ProcessLauncher.
59 A ProcessProtocol to go with the ProcessLauncher.
57 """
60 """
58 def __init__(self, process_launcher):
61 def __init__(self, process_launcher):
59 self.process_launcher = process_launcher
62 self.process_launcher = process_launcher
60
63
61 def connectionMade(self):
64 def connectionMade(self):
62 self.process_launcher.fire_start_deferred(self.transport.pid)
65 self.process_launcher.fire_start_deferred(self.transport.pid)
63
66
64 def processEnded(self, status):
67 def processEnded(self, status):
65 value = status.value
68 value = status.value
66 if isinstance(value, ProcessDone):
69 if isinstance(value, ProcessDone):
67 self.process_launcher.fire_stop_deferred(0)
70 self.process_launcher.fire_stop_deferred(0)
68 elif isinstance(value, ProcessTerminated):
71 elif isinstance(value, ProcessTerminated):
69 self.process_launcher.fire_stop_deferred(
72 self.process_launcher.fire_stop_deferred(
70 {'exit_code':value.exitCode,
73 {'exit_code':value.exitCode,
71 'signal':value.signal,
74 'signal':value.signal,
72 'status':value.status
75 'status':value.status
73 }
76 }
74 )
77 )
75 else:
78 else:
76 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
79 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
77
80
78 def outReceived(self, data):
81 def outReceived(self, data):
79 log.msg(data)
82 log.msg(data)
80
83
81 def errReceived(self, data):
84 def errReceived(self, data):
82 log.err(data)
85 log.err(data)
83
86
84 class ProcessLauncher(object):
87 class ProcessLauncher(object):
85 """
88 """
86 Start and stop an external process in an asynchronous manner.
89 Start and stop an external process in an asynchronous manner.
87
90
88 Currently this uses deferreds to notify other parties of process state
91 Currently this uses deferreds to notify other parties of process state
89 changes. This is an awkward design and should be moved to using
92 changes. This is an awkward design and should be moved to using
90 a formal NotificationCenter.
93 a formal NotificationCenter.
91 """
94 """
92 def __init__(self, cmd_and_args):
95 def __init__(self, cmd_and_args):
93 self.cmd = cmd_and_args[0]
96 self.cmd = cmd_and_args[0]
94 self.args = cmd_and_args
97 self.args = cmd_and_args
95 self._reset()
98 self._reset()
96
99
97 def _reset(self):
100 def _reset(self):
98 self.process_protocol = None
101 self.process_protocol = None
99 self.pid = None
102 self.pid = None
100 self.start_deferred = None
103 self.start_deferred = None
101 self.stop_deferreds = []
104 self.stop_deferreds = []
102 self.state = 'before' # before, running, or after
105 self.state = 'before' # before, running, or after
103
106
104 @property
107 @property
105 def running(self):
108 def running(self):
106 if self.state == 'running':
109 if self.state == 'running':
107 return True
110 return True
108 else:
111 else:
109 return False
112 return False
110
113
111 def fire_start_deferred(self, pid):
114 def fire_start_deferred(self, pid):
112 self.pid = pid
115 self.pid = pid
113 self.state = 'running'
116 self.state = 'running'
114 log.msg('Process %r has started with pid=%i' % (self.args, pid))
117 log.msg('Process %r has started with pid=%i' % (self.args, pid))
115 self.start_deferred.callback(pid)
118 self.start_deferred.callback(pid)
116
119
117 def start(self):
120 def start(self):
118 if self.state == 'before':
121 if self.state == 'before':
119 self.process_protocol = LauncherProcessProtocol(self)
122 self.process_protocol = LauncherProcessProtocol(self)
120 self.start_deferred = defer.Deferred()
123 self.start_deferred = defer.Deferred()
121 self.process_transport = reactor.spawnProcess(
124 self.process_transport = reactor.spawnProcess(
122 self.process_protocol,
125 self.process_protocol,
123 self.cmd,
126 self.cmd,
124 self.args,
127 self.args,
125 env=os.environ
128 env=os.environ
126 )
129 )
127 return self.start_deferred
130 return self.start_deferred
128 else:
131 else:
129 s = 'the process has already been started and has state: %r' % \
132 s = 'the process has already been started and has state: %r' % \
130 self.state
133 self.state
131 return defer.fail(ProcessStateError(s))
134 return defer.fail(ProcessStateError(s))
132
135
133 def get_stop_deferred(self):
136 def get_stop_deferred(self):
134 if self.state == 'running' or self.state == 'before':
137 if self.state == 'running' or self.state == 'before':
135 d = defer.Deferred()
138 d = defer.Deferred()
136 self.stop_deferreds.append(d)
139 self.stop_deferreds.append(d)
137 return d
140 return d
138 else:
141 else:
139 s = 'this process is already complete'
142 s = 'this process is already complete'
140 return defer.fail(ProcessStateError(s))
143 return defer.fail(ProcessStateError(s))
141
144
142 def fire_stop_deferred(self, exit_code):
145 def fire_stop_deferred(self, exit_code):
143 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
146 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
144 self.state = 'after'
147 self.state = 'after'
145 for d in self.stop_deferreds:
148 for d in self.stop_deferreds:
146 d.callback(exit_code)
149 d.callback(exit_code)
147
150
148 def signal(self, sig):
151 def signal(self, sig):
149 """
152 """
150 Send a signal to the process.
153 Send a signal to the process.
151
154
152 The argument sig can be ('KILL','INT', etc.) or any signal number.
155 The argument sig can be ('KILL','INT', etc.) or any signal number.
153 """
156 """
154 if self.state == 'running':
157 if self.state == 'running':
155 self.process_transport.signalProcess(sig)
158 self.process_transport.signalProcess(sig)
156
159
157 # def __del__(self):
160 # def __del__(self):
158 # self.signal('KILL')
161 # self.signal('KILL')
159
162
160 def interrupt_then_kill(self, delay=1.0):
163 def interrupt_then_kill(self, delay=1.0):
161 self.signal('INT')
164 self.signal('INT')
162 reactor.callLater(delay, self.signal, 'KILL')
165 reactor.callLater(delay, self.signal, 'KILL')
163
166
164
167
165 #-----------------------------------------------------------------------------
168 #-----------------------------------------------------------------------------
166 # Code for launching controller and engines
169 # Code for launching controller and engines
167 #-----------------------------------------------------------------------------
170 #-----------------------------------------------------------------------------
168
171
169
172
170 class ControllerLauncher(ProcessLauncher):
173 class ControllerLauncher(ProcessLauncher):
171
174
172 def __init__(self, extra_args=None):
175 def __init__(self, extra_args=None):
173 if sys.platform == 'win32':
176 if sys.platform == 'win32':
174 args = [find_exe('ipcontroller.bat')]
177 args = [find_exe('ipcontroller')]
175 else:
178 else:
176 args = ['ipcontroller']
179 args = ['ipcontroller']
177 self.extra_args = extra_args
180 self.extra_args = extra_args
178 if extra_args is not None:
181 if extra_args is not None:
179 args.extend(extra_args)
182 args.extend(extra_args)
180
183
181 ProcessLauncher.__init__(self, args)
184 ProcessLauncher.__init__(self, args)
182
185
183
186
184 class EngineLauncher(ProcessLauncher):
187 class EngineLauncher(ProcessLauncher):
185
188
186 def __init__(self, extra_args=None):
189 def __init__(self, extra_args=None):
187 if sys.platform == 'win32':
190 if sys.platform == 'win32':
188 args = [find_exe('ipengine.bat')]
191 args = [find_exe('ipengine')]
189 else:
192 else:
190 args = ['ipengine']
193 args = ['ipengine']
191 self.extra_args = extra_args
194 self.extra_args = extra_args
192 if extra_args is not None:
195 if extra_args is not None:
193 args.extend(extra_args)
196 args.extend(extra_args)
194
197
195 ProcessLauncher.__init__(self, args)
198 ProcessLauncher.__init__(self, args)
196
199
197
200
198 class LocalEngineSet(object):
201 class LocalEngineSet(object):
199
202
200 def __init__(self, extra_args=None):
203 def __init__(self, extra_args=None):
201 self.extra_args = extra_args
204 self.extra_args = extra_args
202 self.launchers = []
205 self.launchers = []
203
206
204 def start(self, n):
207 def start(self, n):
205 dlist = []
208 dlist = []
206 for i in range(n):
209 for i in range(n):
207 el = EngineLauncher(extra_args=self.extra_args)
210 el = EngineLauncher(extra_args=self.extra_args)
208 d = el.start()
211 d = el.start()
209 self.launchers.append(el)
212 self.launchers.append(el)
210 dlist.append(d)
213 dlist.append(d)
211 dfinal = gatherBoth(dlist, consumeErrors=True)
214 dfinal = gatherBoth(dlist, consumeErrors=True)
212 dfinal.addCallback(self._handle_start)
215 dfinal.addCallback(self._handle_start)
213 return dfinal
216 return dfinal
214
217
215 def _handle_start(self, r):
218 def _handle_start(self, r):
216 log.msg('Engines started with pids: %r' % r)
219 log.msg('Engines started with pids: %r' % r)
217 return r
220 return r
218
221
219 def _handle_stop(self, r):
222 def _handle_stop(self, r):
220 log.msg('Engines received signal: %r' % r)
223 log.msg('Engines received signal: %r' % r)
221 return r
224 return r
222
225
223 def signal(self, sig):
226 def signal(self, sig):
224 dlist = []
227 dlist = []
225 for el in self.launchers:
228 for el in self.launchers:
226 d = el.get_stop_deferred()
229 d = el.get_stop_deferred()
227 dlist.append(d)
230 dlist.append(d)
228 el.signal(sig)
231 el.signal(sig)
229 dfinal = gatherBoth(dlist, consumeErrors=True)
232 dfinal = gatherBoth(dlist, consumeErrors=True)
230 dfinal.addCallback(self._handle_stop)
233 dfinal.addCallback(self._handle_stop)
231 return dfinal
234 return dfinal
232
235
233 def interrupt_then_kill(self, delay=1.0):
236 def interrupt_then_kill(self, delay=1.0):
234 dlist = []
237 dlist = []
235 for el in self.launchers:
238 for el in self.launchers:
236 d = el.get_stop_deferred()
239 d = el.get_stop_deferred()
237 dlist.append(d)
240 dlist.append(d)
238 el.interrupt_then_kill(delay)
241 el.interrupt_then_kill(delay)
239 dfinal = gatherBoth(dlist, consumeErrors=True)
242 dfinal = gatherBoth(dlist, consumeErrors=True)
240 dfinal.addCallback(self._handle_stop)
243 dfinal.addCallback(self._handle_stop)
241 return dfinal
244 return dfinal
242
245
243
246
244 class BatchEngineSet(object):
247 class BatchEngineSet(object):
245
248
246 # Subclasses must fill these in. See PBSEngineSet
249 # Subclasses must fill these in. See PBSEngineSet
247 submit_command = ''
250 submit_command = ''
248 delete_command = ''
251 delete_command = ''
249 job_id_regexp = ''
252 job_id_regexp = ''
250
253
251 def __init__(self, template_file, **kwargs):
254 def __init__(self, template_file, **kwargs):
252 self.template_file = template_file
255 self.template_file = template_file
253 self.context = {}
256 self.context = {}
254 self.context.update(kwargs)
257 self.context.update(kwargs)
255 self.batch_file = self.template_file+'-run'
258 self.batch_file = self.template_file+'-run'
256
259
257 def parse_job_id(self, output):
260 def parse_job_id(self, output):
258 m = re.match(self.job_id_regexp, output)
261 m = re.match(self.job_id_regexp, output)
259 if m is not None:
262 if m is not None:
260 job_id = m.group()
263 job_id = m.group()
261 else:
264 else:
262 raise Exception("job id couldn't be determined: %s" % output)
265 raise Exception("job id couldn't be determined: %s" % output)
263 self.job_id = job_id
266 self.job_id = job_id
264 log.msg('Job started with job id: %r' % job_id)
267 log.msg('Job started with job id: %r' % job_id)
265 return job_id
268 return job_id
266
269
267 def write_batch_script(self, n):
270 def write_batch_script(self, n):
268 self.context['n'] = n
271 self.context['n'] = n
269 template = open(self.template_file, 'r').read()
272 template = open(self.template_file, 'r').read()
270 log.msg('Using template for batch script: %s' % self.template_file)
273 log.msg('Using template for batch script: %s' % self.template_file)
271 script_as_string = Itpl.itplns(template, self.context)
274 script_as_string = Itpl.itplns(template, self.context)
272 log.msg('Writing instantiated batch script: %s' % self.batch_file)
275 log.msg('Writing instantiated batch script: %s' % self.batch_file)
273 f = open(self.batch_file,'w')
276 f = open(self.batch_file,'w')
274 f.write(script_as_string)
277 f.write(script_as_string)
275 f.close()
278 f.close()
276
279
277 def handle_error(self, f):
280 def handle_error(self, f):
278 f.printTraceback()
281 f.printTraceback()
279 f.raiseException()
282 f.raiseException()
280
283
281 def start(self, n):
284 def start(self, n):
282 self.write_batch_script(n)
285 self.write_batch_script(n)
283 d = getProcessOutput(self.submit_command,
286 d = getProcessOutput(self.submit_command,
284 [self.batch_file],env=os.environ)
287 [self.batch_file],env=os.environ)
285 d.addCallback(self.parse_job_id)
288 d.addCallback(self.parse_job_id)
286 d.addErrback(self.handle_error)
289 d.addErrback(self.handle_error)
287 return d
290 return d
288
291
289 def kill(self):
292 def kill(self):
290 d = getProcessOutput(self.delete_command,
293 d = getProcessOutput(self.delete_command,
291 [self.job_id],env=os.environ)
294 [self.job_id],env=os.environ)
292 return d
295 return d
293
296
294 class PBSEngineSet(BatchEngineSet):
297 class PBSEngineSet(BatchEngineSet):
295
298
296 submit_command = 'qsub'
299 submit_command = 'qsub'
297 delete_command = 'qdel'
300 delete_command = 'qdel'
298 job_id_regexp = '\d+'
301 job_id_regexp = '\d+'
299
302
300 def __init__(self, template_file, **kwargs):
303 def __init__(self, template_file, **kwargs):
301 BatchEngineSet.__init__(self, template_file, **kwargs)
304 BatchEngineSet.__init__(self, template_file, **kwargs)
302
305
303
306
304 #-----------------------------------------------------------------------------
307 #-----------------------------------------------------------------------------
305 # Main functions for the different types of clusters
308 # Main functions for the different types of clusters
306 #-----------------------------------------------------------------------------
309 #-----------------------------------------------------------------------------
307
310
308 # TODO:
311 # TODO:
309 # The logic in these codes should be moved into classes like LocalCluster
312 # The logic in these codes should be moved into classes like LocalCluster
310 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
313 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
311 # The main functions should then just parse the command line arguments, create
314 # The main functions should then just parse the command line arguments, create
312 # the appropriate class and call a 'start' method.
315 # the appropriate class and call a 'start' method.
313
316
314 def main_local(args):
317 def main_local(args):
315 cont_args = []
318 cont_args = []
316 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
319 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
317 if args.x:
320 if args.x:
318 cont_args.append('-x')
321 cont_args.append('-x')
319 if args.y:
322 if args.y:
320 cont_args.append('-y')
323 cont_args.append('-y')
321 cl = ControllerLauncher(extra_args=cont_args)
324 cl = ControllerLauncher(extra_args=cont_args)
322 dstart = cl.start()
325 dstart = cl.start()
323 def start_engines(cont_pid):
326 def start_engines(cont_pid):
324 engine_args = []
327 engine_args = []
325 engine_args.append('--logfile=%s' % \
328 engine_args.append('--logfile=%s' % \
326 pjoin(args.logdir,'ipengine%s-' % cont_pid))
329 pjoin(args.logdir,'ipengine%s-' % cont_pid))
327 eset = LocalEngineSet(extra_args=engine_args)
330 eset = LocalEngineSet(extra_args=engine_args)
328 def shutdown(signum, frame):
331 def shutdown(signum, frame):
329 log.msg('Stopping local cluster')
332 log.msg('Stopping local cluster')
330 # We are still playing with the times here, but these seem
333 # We are still playing with the times here, but these seem
331 # to be reliable in allowing everything to exit cleanly.
334 # to be reliable in allowing everything to exit cleanly.
332 eset.interrupt_then_kill(0.5)
335 eset.interrupt_then_kill(0.5)
333 cl.interrupt_then_kill(0.5)
336 cl.interrupt_then_kill(0.5)
334 reactor.callLater(1.0, reactor.stop)
337 reactor.callLater(1.0, reactor.stop)
335 signal.signal(signal.SIGINT,shutdown)
338 signal.signal(signal.SIGINT,shutdown)
336 d = eset.start(args.n)
339 d = eset.start(args.n)
337 return d
340 return d
338 def delay_start(cont_pid):
341 def delay_start(cont_pid):
339 # This is needed because the controller doesn't start listening
342 # This is needed because the controller doesn't start listening
340 # right when it starts and the controller needs to write
343 # right when it starts and the controller needs to write
341 # furl files for the engine to pick up
344 # furl files for the engine to pick up
342 reactor.callLater(1.0, start_engines, cont_pid)
345 reactor.callLater(1.0, start_engines, cont_pid)
343 dstart.addCallback(delay_start)
346 dstart.addCallback(delay_start)
344 dstart.addErrback(lambda f: f.raiseException())
347 dstart.addErrback(lambda f: f.raiseException())
345
348
346 def main_mpirun(args):
349 def main_mpirun(args):
347 cont_args = []
350 cont_args = []
348 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
351 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
349 if args.x:
352 if args.x:
350 cont_args.append('-x')
353 cont_args.append('-x')
351 if args.y:
354 if args.y:
352 cont_args.append('-y')
355 cont_args.append('-y')
353 cl = ControllerLauncher(extra_args=cont_args)
356 cl = ControllerLauncher(extra_args=cont_args)
354 dstart = cl.start()
357 dstart = cl.start()
355 def start_engines(cont_pid):
358 def start_engines(cont_pid):
356 raw_args = ['mpirun']
359 raw_args = ['mpirun']
357 raw_args.extend(['-n',str(args.n)])
360 raw_args.extend(['-n',str(args.n)])
358 raw_args.append('ipengine')
361 raw_args.append('ipengine')
359 raw_args.append('-l')
362 raw_args.append('-l')
360 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
363 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
361 if args.mpi:
364 if args.mpi:
362 raw_args.append('--mpi=%s' % args.mpi)
365 raw_args.append('--mpi=%s' % args.mpi)
363 eset = ProcessLauncher(raw_args)
366 eset = ProcessLauncher(raw_args)
364 def shutdown(signum, frame):
367 def shutdown(signum, frame):
365 log.msg('Stopping local cluster')
368 log.msg('Stopping local cluster')
366 # We are still playing with the times here, but these seem
369 # We are still playing with the times here, but these seem
367 # to be reliable in allowing everything to exit cleanly.
370 # to be reliable in allowing everything to exit cleanly.
368 eset.interrupt_then_kill(1.0)
371 eset.interrupt_then_kill(1.0)
369 cl.interrupt_then_kill(1.0)
372 cl.interrupt_then_kill(1.0)
370 reactor.callLater(2.0, reactor.stop)
373 reactor.callLater(2.0, reactor.stop)
371 signal.signal(signal.SIGINT,shutdown)
374 signal.signal(signal.SIGINT,shutdown)
372 d = eset.start()
375 d = eset.start()
373 return d
376 return d
374 def delay_start(cont_pid):
377 def delay_start(cont_pid):
375 # This is needed because the controller doesn't start listening
378 # This is needed because the controller doesn't start listening
376 # right when it starts and the controller needs to write
379 # right when it starts and the controller needs to write
377 # furl files for the engine to pick up
380 # furl files for the engine to pick up
378 reactor.callLater(1.0, start_engines, cont_pid)
381 reactor.callLater(1.0, start_engines, cont_pid)
379 dstart.addCallback(delay_start)
382 dstart.addCallback(delay_start)
380 dstart.addErrback(lambda f: f.raiseException())
383 dstart.addErrback(lambda f: f.raiseException())
381
384
382 def main_pbs(args):
385 def main_pbs(args):
383 cont_args = []
386 cont_args = []
384 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
387 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
385 if args.x:
388 if args.x:
386 cont_args.append('-x')
389 cont_args.append('-x')
387 if args.y:
390 if args.y:
388 cont_args.append('-y')
391 cont_args.append('-y')
389 cl = ControllerLauncher(extra_args=cont_args)
392 cl = ControllerLauncher(extra_args=cont_args)
390 dstart = cl.start()
393 dstart = cl.start()
391 def start_engines(r):
394 def start_engines(r):
392 pbs_set = PBSEngineSet(args.pbsscript)
395 pbs_set = PBSEngineSet(args.pbsscript)
393 def shutdown(signum, frame):
396 def shutdown(signum, frame):
394 log.msg('Stopping pbs cluster')
397 log.msg('Stopping pbs cluster')
395 d = pbs_set.kill()
398 d = pbs_set.kill()
396 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
399 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
397 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
400 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
398 signal.signal(signal.SIGINT,shutdown)
401 signal.signal(signal.SIGINT,shutdown)
399 d = pbs_set.start(args.n)
402 d = pbs_set.start(args.n)
400 return d
403 return d
401 dstart.addCallback(start_engines)
404 dstart.addCallback(start_engines)
402 dstart.addErrback(lambda f: f.raiseException())
405 dstart.addErrback(lambda f: f.raiseException())
403
406
404
407
405 def get_args():
408 def get_args():
406 base_parser = argparse.ArgumentParser(add_help=False)
409 base_parser = argparse.ArgumentParser(add_help=False)
407 base_parser.add_argument(
410 base_parser.add_argument(
408 '-x',
411 '-x',
409 action='store_true',
412 action='store_true',
410 dest='x',
413 dest='x',
411 help='turn off client security'
414 help='turn off client security'
412 )
415 )
413 base_parser.add_argument(
416 base_parser.add_argument(
414 '-y',
417 '-y',
415 action='store_true',
418 action='store_true',
416 dest='y',
419 dest='y',
417 help='turn off engine security'
420 help='turn off engine security'
418 )
421 )
419 base_parser.add_argument(
422 base_parser.add_argument(
420 "--logdir",
423 "--logdir",
421 type=str,
424 type=str,
422 dest="logdir",
425 dest="logdir",
423 help="directory to put log files (default=$IPYTHONDIR/log)",
426 help="directory to put log files (default=$IPYTHONDIR/log)",
424 default=pjoin(get_ipython_dir(),'log')
427 default=pjoin(get_ipython_dir(),'log')
425 )
428 )
426 base_parser.add_argument(
429 base_parser.add_argument(
427 "-n",
430 "-n",
428 "--num",
431 "--num",
429 type=int,
432 type=int,
430 dest="n",
433 dest="n",
431 default=2,
434 default=2,
432 help="the number of engines to start"
435 help="the number of engines to start"
433 )
436 )
434
437
435 parser = argparse.ArgumentParser(
438 parser = argparse.ArgumentParser(
436 description='IPython cluster startup. This starts a controller and\
439 description='IPython cluster startup. This starts a controller and\
437 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
440 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
438 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
441 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
439 )
442 )
440 subparsers = parser.add_subparsers(
443 subparsers = parser.add_subparsers(
441 help='available cluster types. For help, do "ipcluster TYPE --help"')
444 help='available cluster types. For help, do "ipcluster TYPE --help"')
442
445
443 parser_local = subparsers.add_parser(
446 parser_local = subparsers.add_parser(
444 'local',
447 'local',
445 help='run a local cluster',
448 help='run a local cluster',
446 parents=[base_parser]
449 parents=[base_parser]
447 )
450 )
448 parser_local.set_defaults(func=main_local)
451 parser_local.set_defaults(func=main_local)
449
452
450 parser_mpirun = subparsers.add_parser(
453 parser_mpirun = subparsers.add_parser(
451 'mpirun',
454 'mpirun',
452 help='run a cluster using mpirun',
455 help='run a cluster using mpirun',
453 parents=[base_parser]
456 parents=[base_parser]
454 )
457 )
455 parser_mpirun.add_argument(
458 parser_mpirun.add_argument(
456 "--mpi",
459 "--mpi",
457 type=str,
460 type=str,
458 dest="mpi", # Don't put a default here to allow no MPI support
461 dest="mpi", # Don't put a default here to allow no MPI support
459 help="how to call MPI_Init (default=mpi4py)"
462 help="how to call MPI_Init (default=mpi4py)"
460 )
463 )
461 parser_mpirun.set_defaults(func=main_mpirun)
464 parser_mpirun.set_defaults(func=main_mpirun)
462
465
463 parser_pbs = subparsers.add_parser(
466 parser_pbs = subparsers.add_parser(
464 'pbs',
467 'pbs',
465 help='run a pbs cluster',
468 help='run a pbs cluster',
466 parents=[base_parser]
469 parents=[base_parser]
467 )
470 )
468 parser_pbs.add_argument(
471 parser_pbs.add_argument(
469 '--pbs-script',
472 '--pbs-script',
470 type=str,
473 type=str,
471 dest='pbsscript',
474 dest='pbsscript',
472 help='PBS script template',
475 help='PBS script template',
473 default='pbs.template'
476 default='pbs.template'
474 )
477 )
475 parser_pbs.set_defaults(func=main_pbs)
478 parser_pbs.set_defaults(func=main_pbs)
476 args = parser.parse_args()
479 args = parser.parse_args()
477 return args
480 return args
478
481
479 def main():
482 def main():
480 args = get_args()
483 args = get_args()
481 reactor.callWhenRunning(args.func, args)
484 reactor.callWhenRunning(args.func, args)
482 log.startLogging(sys.stdout)
485 log.startLogging(sys.stdout)
483 reactor.run()
486 reactor.run()
484
487
485 if __name__ == '__main__':
488 if __name__ == '__main__':
486 main()
489 main()
General Comments 0
You need to be logged in to leave comments. Login now