##// END OF EJS Templates
Fixing numerous bugs in ipcluster on Win32....
Administrator -
Show More
@@ -1,489 +1,519 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 from twisted.internet import reactor, defer
23 from twisted.internet import reactor, defer
24 from twisted.internet.protocol import ProcessProtocol
24 from twisted.internet.protocol import ProcessProtocol
25 from twisted.python import failure, log
25 from twisted.python import failure, log
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28
28
29 from IPython.external import argparse
29 from IPython.external import argparse
30 from IPython.external import Itpl
30 from IPython.external import Itpl
31 from IPython.kernel.twistedutil import gatherBoth
31 from IPython.kernel.twistedutil import gatherBoth
32 from IPython.kernel.util import printer
32 from IPython.kernel.util import printer
33 from IPython.genutils import get_ipython_dir, num_cpus
33 from IPython.genutils import get_ipython_dir, num_cpus
34 from IPython.kernel.fcutil import have_crypto
35 from IPython.kernel.error import SecurityError
34
36
35 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
36 # General process handling code
38 # General process handling code
37 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
38
40
39 def find_exe(cmd):
41 def find_exe(cmd):
40 try:
42 try:
41 import win32api
43 import win32api
42 except ImportError:
44 except ImportError:
43 raise ImportError('you need to have pywin32 installed for this to work')
45 raise ImportError('you need to have pywin32 installed for this to work')
44 else:
46 else:
45 try:
47 try:
46 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
48 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
47 except:
49 except:
48 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
50 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
49 return path
51 return path
50
52
51 class ProcessStateError(Exception):
53 class ProcessStateError(Exception):
52 pass
54 pass
53
55
54 class UnknownStatus(Exception):
56 class UnknownStatus(Exception):
55 pass
57 pass
56
58
57 class LauncherProcessProtocol(ProcessProtocol):
59 class LauncherProcessProtocol(ProcessProtocol):
58 """
60 """
59 A ProcessProtocol to go with the ProcessLauncher.
61 A ProcessProtocol to go with the ProcessLauncher.
60 """
62 """
61 def __init__(self, process_launcher):
63 def __init__(self, process_launcher):
62 self.process_launcher = process_launcher
64 self.process_launcher = process_launcher
63
65
64 def connectionMade(self):
66 def connectionMade(self):
65 self.process_launcher.fire_start_deferred(self.transport.pid)
67 self.process_launcher.fire_start_deferred(self.transport.pid)
66
68
67 def processEnded(self, status):
69 def processEnded(self, status):
68 value = status.value
70 value = status.value
69 if isinstance(value, ProcessDone):
71 if isinstance(value, ProcessDone):
70 self.process_launcher.fire_stop_deferred(0)
72 self.process_launcher.fire_stop_deferred(0)
71 elif isinstance(value, ProcessTerminated):
73 elif isinstance(value, ProcessTerminated):
72 self.process_launcher.fire_stop_deferred(
74 self.process_launcher.fire_stop_deferred(
73 {'exit_code':value.exitCode,
75 {'exit_code':value.exitCode,
74 'signal':value.signal,
76 'signal':value.signal,
75 'status':value.status
77 'status':value.status
76 }
78 }
77 )
79 )
78 else:
80 else:
79 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
81 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
80
82
81 def outReceived(self, data):
83 def outReceived(self, data):
82 log.msg(data)
84 log.msg(data)
83
85
84 def errReceived(self, data):
86 def errReceived(self, data):
85 log.err(data)
87 log.err(data)
86
88
87 class ProcessLauncher(object):
89 class ProcessLauncher(object):
88 """
90 """
89 Start and stop an external process in an asynchronous manner.
91 Start and stop an external process in an asynchronous manner.
90
92
91 Currently this uses deferreds to notify other parties of process state
93 Currently this uses deferreds to notify other parties of process state
92 changes. This is an awkward design and should be moved to using
94 changes. This is an awkward design and should be moved to using
93 a formal NotificationCenter.
95 a formal NotificationCenter.
94 """
96 """
95 def __init__(self, cmd_and_args):
97 def __init__(self, cmd_and_args):
96 self.cmd = cmd_and_args[0]
98 self.cmd = cmd_and_args[0]
97 self.args = cmd_and_args
99 self.args = cmd_and_args
98 self._reset()
100 self._reset()
99
101
100 def _reset(self):
102 def _reset(self):
101 self.process_protocol = None
103 self.process_protocol = None
102 self.pid = None
104 self.pid = None
103 self.start_deferred = None
105 self.start_deferred = None
104 self.stop_deferreds = []
106 self.stop_deferreds = []
105 self.state = 'before' # before, running, or after
107 self.state = 'before' # before, running, or after
106
108
107 @property
109 @property
108 def running(self):
110 def running(self):
109 if self.state == 'running':
111 if self.state == 'running':
110 return True
112 return True
111 else:
113 else:
112 return False
114 return False
113
115
114 def fire_start_deferred(self, pid):
116 def fire_start_deferred(self, pid):
115 self.pid = pid
117 self.pid = pid
116 self.state = 'running'
118 self.state = 'running'
117 log.msg('Process %r has started with pid=%i' % (self.args, pid))
119 log.msg('Process %r has started with pid=%i' % (self.args, pid))
118 self.start_deferred.callback(pid)
120 self.start_deferred.callback(pid)
119
121
120 def start(self):
122 def start(self):
121 if self.state == 'before':
123 if self.state == 'before':
122 self.process_protocol = LauncherProcessProtocol(self)
124 self.process_protocol = LauncherProcessProtocol(self)
123 self.start_deferred = defer.Deferred()
125 self.start_deferred = defer.Deferred()
124 self.process_transport = reactor.spawnProcess(
126 self.process_transport = reactor.spawnProcess(
125 self.process_protocol,
127 self.process_protocol,
126 self.cmd,
128 self.cmd,
127 self.args,
129 self.args,
128 env=os.environ
130 env=os.environ
129 )
131 )
130 return self.start_deferred
132 return self.start_deferred
131 else:
133 else:
132 s = 'the process has already been started and has state: %r' % \
134 s = 'the process has already been started and has state: %r' % \
133 self.state
135 self.state
134 return defer.fail(ProcessStateError(s))
136 return defer.fail(ProcessStateError(s))
135
137
136 def get_stop_deferred(self):
138 def get_stop_deferred(self):
137 if self.state == 'running' or self.state == 'before':
139 if self.state == 'running' or self.state == 'before':
138 d = defer.Deferred()
140 d = defer.Deferred()
139 self.stop_deferreds.append(d)
141 self.stop_deferreds.append(d)
140 return d
142 return d
141 else:
143 else:
142 s = 'this process is already complete'
144 s = 'this process is already complete'
143 return defer.fail(ProcessStateError(s))
145 return defer.fail(ProcessStateError(s))
144
146
145 def fire_stop_deferred(self, exit_code):
147 def fire_stop_deferred(self, exit_code):
146 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
148 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
147 self.state = 'after'
149 self.state = 'after'
148 for d in self.stop_deferreds:
150 for d in self.stop_deferreds:
149 d.callback(exit_code)
151 d.callback(exit_code)
150
152
151 def signal(self, sig):
153 def signal(self, sig):
152 """
154 """
153 Send a signal to the process.
155 Send a signal to the process.
154
156
155 The argument sig can be ('KILL','INT', etc.) or any signal number.
157 The argument sig can be ('KILL','INT', etc.) or any signal number.
156 """
158 """
157 if self.state == 'running':
159 if self.state == 'running':
158 self.process_transport.signalProcess(sig)
160 self.process_transport.signalProcess(sig)
159
161
160 # def __del__(self):
162 # def __del__(self):
161 # self.signal('KILL')
163 # self.signal('KILL')
162
164
163 def interrupt_then_kill(self, delay=1.0):
165 def interrupt_then_kill(self, delay=1.0):
164 self.signal('INT')
166 self.signal('INT')
165 reactor.callLater(delay, self.signal, 'KILL')
167 reactor.callLater(delay, self.signal, 'KILL')
166
168
167
169
168 #-----------------------------------------------------------------------------
170 #-----------------------------------------------------------------------------
169 # Code for launching controller and engines
171 # Code for launching controller and engines
170 #-----------------------------------------------------------------------------
172 #-----------------------------------------------------------------------------
171
173
172
174
173 class ControllerLauncher(ProcessLauncher):
175 class ControllerLauncher(ProcessLauncher):
174
176
175 def __init__(self, extra_args=None):
177 def __init__(self, extra_args=None):
176 if sys.platform == 'win32':
178 if sys.platform == 'win32':
177 args = [find_exe('ipcontroller')]
179 # This logic is needed because the ipcontroller script doesn't
180 # always get installed in the same way or in the same location.
181 from IPython.kernel.scripts import ipcontroller
182 script_location = ipcontroller.__file__.replace('.pyc', '.py')
183 # The -u option here turns on unbuffered output, which is required
184 # on Win32 to prevent wierd conflict and problems with Twisted
185 args = [find_exe('python'), '-u', script_location]
178 else:
186 else:
179 args = ['ipcontroller']
187 args = ['ipcontroller']
180 self.extra_args = extra_args
188 self.extra_args = extra_args
181 if extra_args is not None:
189 if extra_args is not None:
182 args.extend(extra_args)
190 args.extend(extra_args)
183
191
184 ProcessLauncher.__init__(self, args)
192 ProcessLauncher.__init__(self, args)
185
193
186
194
187 class EngineLauncher(ProcessLauncher):
195 class EngineLauncher(ProcessLauncher):
188
196
189 def __init__(self, extra_args=None):
197 def __init__(self, extra_args=None):
190 if sys.platform == 'win32':
198 if sys.platform == 'win32':
191 args = [find_exe('ipengine')]
199 # This logic is needed because the ipcontroller script doesn't
200 # always get installed in the same way or in the same location.
201 from IPython.kernel.scripts import ipengine
202 script_location = ipengine.__file__.replace('.pyc', '.py')
203 # The -u option here turns on unbuffered output, which is required
204 # on Win32 to prevent wierd conflict and problems with Twisted
205 args = [find_exe('python'), '-u', script_location]
192 else:
206 else:
193 args = ['ipengine']
207 args = ['ipengine']
194 self.extra_args = extra_args
208 self.extra_args = extra_args
195 if extra_args is not None:
209 if extra_args is not None:
196 args.extend(extra_args)
210 args.extend(extra_args)
197
211
198 ProcessLauncher.__init__(self, args)
212 ProcessLauncher.__init__(self, args)
199
213
200
214
201 class LocalEngineSet(object):
215 class LocalEngineSet(object):
202
216
203 def __init__(self, extra_args=None):
217 def __init__(self, extra_args=None):
204 self.extra_args = extra_args
218 self.extra_args = extra_args
205 self.launchers = []
219 self.launchers = []
206
220
207 def start(self, n):
221 def start(self, n):
208 dlist = []
222 dlist = []
209 for i in range(n):
223 for i in range(n):
210 el = EngineLauncher(extra_args=self.extra_args)
224 el = EngineLauncher(extra_args=self.extra_args)
211 d = el.start()
225 d = el.start()
212 self.launchers.append(el)
226 self.launchers.append(el)
213 dlist.append(d)
227 dlist.append(d)
214 dfinal = gatherBoth(dlist, consumeErrors=True)
228 dfinal = gatherBoth(dlist, consumeErrors=True)
215 dfinal.addCallback(self._handle_start)
229 dfinal.addCallback(self._handle_start)
216 return dfinal
230 return dfinal
217
231
218 def _handle_start(self, r):
232 def _handle_start(self, r):
219 log.msg('Engines started with pids: %r' % r)
233 log.msg('Engines started with pids: %r' % r)
220 return r
234 return r
221
235
222 def _handle_stop(self, r):
236 def _handle_stop(self, r):
223 log.msg('Engines received signal: %r' % r)
237 log.msg('Engines received signal: %r' % r)
224 return r
238 return r
225
239
226 def signal(self, sig):
240 def signal(self, sig):
227 dlist = []
241 dlist = []
228 for el in self.launchers:
242 for el in self.launchers:
229 d = el.get_stop_deferred()
243 d = el.get_stop_deferred()
230 dlist.append(d)
244 dlist.append(d)
231 el.signal(sig)
245 el.signal(sig)
232 dfinal = gatherBoth(dlist, consumeErrors=True)
246 dfinal = gatherBoth(dlist, consumeErrors=True)
233 dfinal.addCallback(self._handle_stop)
247 dfinal.addCallback(self._handle_stop)
234 return dfinal
248 return dfinal
235
249
236 def interrupt_then_kill(self, delay=1.0):
250 def interrupt_then_kill(self, delay=1.0):
237 dlist = []
251 dlist = []
238 for el in self.launchers:
252 for el in self.launchers:
239 d = el.get_stop_deferred()
253 d = el.get_stop_deferred()
240 dlist.append(d)
254 dlist.append(d)
241 el.interrupt_then_kill(delay)
255 el.interrupt_then_kill(delay)
242 dfinal = gatherBoth(dlist, consumeErrors=True)
256 dfinal = gatherBoth(dlist, consumeErrors=True)
243 dfinal.addCallback(self._handle_stop)
257 dfinal.addCallback(self._handle_stop)
244 return dfinal
258 return dfinal
245
259
246
260
247 class BatchEngineSet(object):
261 class BatchEngineSet(object):
248
262
249 # Subclasses must fill these in. See PBSEngineSet
263 # Subclasses must fill these in. See PBSEngineSet
250 submit_command = ''
264 submit_command = ''
251 delete_command = ''
265 delete_command = ''
252 job_id_regexp = ''
266 job_id_regexp = ''
253
267
254 def __init__(self, template_file, **kwargs):
268 def __init__(self, template_file, **kwargs):
255 self.template_file = template_file
269 self.template_file = template_file
256 self.context = {}
270 self.context = {}
257 self.context.update(kwargs)
271 self.context.update(kwargs)
258 self.batch_file = self.template_file+'-run'
272 self.batch_file = self.template_file+'-run'
259
273
260 def parse_job_id(self, output):
274 def parse_job_id(self, output):
261 m = re.match(self.job_id_regexp, output)
275 m = re.match(self.job_id_regexp, output)
262 if m is not None:
276 if m is not None:
263 job_id = m.group()
277 job_id = m.group()
264 else:
278 else:
265 raise Exception("job id couldn't be determined: %s" % output)
279 raise Exception("job id couldn't be determined: %s" % output)
266 self.job_id = job_id
280 self.job_id = job_id
267 log.msg('Job started with job id: %r' % job_id)
281 log.msg('Job started with job id: %r' % job_id)
268 return job_id
282 return job_id
269
283
270 def write_batch_script(self, n):
284 def write_batch_script(self, n):
271 self.context['n'] = n
285 self.context['n'] = n
272 template = open(self.template_file, 'r').read()
286 template = open(self.template_file, 'r').read()
273 log.msg('Using template for batch script: %s' % self.template_file)
287 log.msg('Using template for batch script: %s' % self.template_file)
274 script_as_string = Itpl.itplns(template, self.context)
288 script_as_string = Itpl.itplns(template, self.context)
275 log.msg('Writing instantiated batch script: %s' % self.batch_file)
289 log.msg('Writing instantiated batch script: %s' % self.batch_file)
276 f = open(self.batch_file,'w')
290 f = open(self.batch_file,'w')
277 f.write(script_as_string)
291 f.write(script_as_string)
278 f.close()
292 f.close()
279
293
280 def handle_error(self, f):
294 def handle_error(self, f):
281 f.printTraceback()
295 f.printTraceback()
282 f.raiseException()
296 f.raiseException()
283
297
284 def start(self, n):
298 def start(self, n):
285 self.write_batch_script(n)
299 self.write_batch_script(n)
286 d = getProcessOutput(self.submit_command,
300 d = getProcessOutput(self.submit_command,
287 [self.batch_file],env=os.environ)
301 [self.batch_file],env=os.environ)
288 d.addCallback(self.parse_job_id)
302 d.addCallback(self.parse_job_id)
289 d.addErrback(self.handle_error)
303 d.addErrback(self.handle_error)
290 return d
304 return d
291
305
292 def kill(self):
306 def kill(self):
293 d = getProcessOutput(self.delete_command,
307 d = getProcessOutput(self.delete_command,
294 [self.job_id],env=os.environ)
308 [self.job_id],env=os.environ)
295 return d
309 return d
296
310
297 class PBSEngineSet(BatchEngineSet):
311 class PBSEngineSet(BatchEngineSet):
298
312
299 submit_command = 'qsub'
313 submit_command = 'qsub'
300 delete_command = 'qdel'
314 delete_command = 'qdel'
301 job_id_regexp = '\d+'
315 job_id_regexp = '\d+'
302
316
303 def __init__(self, template_file, **kwargs):
317 def __init__(self, template_file, **kwargs):
304 BatchEngineSet.__init__(self, template_file, **kwargs)
318 BatchEngineSet.__init__(self, template_file, **kwargs)
305
319
306
320
307 #-----------------------------------------------------------------------------
321 #-----------------------------------------------------------------------------
308 # Main functions for the different types of clusters
322 # Main functions for the different types of clusters
309 #-----------------------------------------------------------------------------
323 #-----------------------------------------------------------------------------
310
324
311 # TODO:
325 # TODO:
312 # The logic in these codes should be moved into classes like LocalCluster
326 # The logic in these codes should be moved into classes like LocalCluster
313 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
327 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
314 # The main functions should then just parse the command line arguments, create
328 # The main functions should then just parse the command line arguments, create
315 # the appropriate class and call a 'start' method.
329 # the appropriate class and call a 'start' method.
316
330
317 def main_local(args):
331 def check_security(args, cont_args):
318 cont_args = []
332 if (not args.x or not args.y) and not have_crypto:
319 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
333 log.err("""
334 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
335 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
336 reactor.stop()
337 return False
320 if args.x:
338 if args.x:
321 cont_args.append('-x')
339 cont_args.append('-x')
322 if args.y:
340 if args.y:
323 cont_args.append('-y')
341 cont_args.append('-y')
342 return True
343
344 def main_local(args):
345 cont_args = []
346 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
347
348 # Check security settings before proceeding
349 keep_going = check_security(args, cont_args)
350 if not keep_going: return
351
324 cl = ControllerLauncher(extra_args=cont_args)
352 cl = ControllerLauncher(extra_args=cont_args)
325 dstart = cl.start()
353 dstart = cl.start()
326 def start_engines(cont_pid):
354 def start_engines(cont_pid):
327 engine_args = []
355 engine_args = []
328 engine_args.append('--logfile=%s' % \
356 engine_args.append('--logfile=%s' % \
329 pjoin(args.logdir,'ipengine%s-' % cont_pid))
357 pjoin(args.logdir,'ipengine%s-' % cont_pid))
330 eset = LocalEngineSet(extra_args=engine_args)
358 eset = LocalEngineSet(extra_args=engine_args)
331 def shutdown(signum, frame):
359 def shutdown(signum, frame):
332 log.msg('Stopping local cluster')
360 log.msg('Stopping local cluster')
333 # We are still playing with the times here, but these seem
361 # We are still playing with the times here, but these seem
334 # to be reliable in allowing everything to exit cleanly.
362 # to be reliable in allowing everything to exit cleanly.
335 eset.interrupt_then_kill(0.5)
363 eset.interrupt_then_kill(0.5)
336 cl.interrupt_then_kill(0.5)
364 cl.interrupt_then_kill(0.5)
337 reactor.callLater(1.0, reactor.stop)
365 reactor.callLater(1.0, reactor.stop)
338 signal.signal(signal.SIGINT,shutdown)
366 signal.signal(signal.SIGINT,shutdown)
339 d = eset.start(args.n)
367 d = eset.start(args.n)
340 return d
368 return d
341 def delay_start(cont_pid):
369 def delay_start(cont_pid):
342 # This is needed because the controller doesn't start listening
370 # This is needed because the controller doesn't start listening
343 # right when it starts and the controller needs to write
371 # right when it starts and the controller needs to write
344 # furl files for the engine to pick up
372 # furl files for the engine to pick up
345 reactor.callLater(1.0, start_engines, cont_pid)
373 reactor.callLater(1.0, start_engines, cont_pid)
346 dstart.addCallback(delay_start)
374 dstart.addCallback(delay_start)
347 dstart.addErrback(lambda f: f.raiseException())
375 dstart.addErrback(lambda f: f.raiseException())
348
376
349 def main_mpirun(args):
377 def main_mpirun(args):
350 cont_args = []
378 cont_args = []
351 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
379 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
352 if args.x:
380
353 cont_args.append('-x')
381 # Check security settings before proceeding
354 if args.y:
382 keep_going = check_security(args, cont_args)
355 cont_args.append('-y')
383 if not keep_going: return
384
356 cl = ControllerLauncher(extra_args=cont_args)
385 cl = ControllerLauncher(extra_args=cont_args)
357 dstart = cl.start()
386 dstart = cl.start()
358 def start_engines(cont_pid):
387 def start_engines(cont_pid):
359 raw_args = ['mpirun']
388 raw_args = ['mpirun']
360 raw_args.extend(['-n',str(args.n)])
389 raw_args.extend(['-n',str(args.n)])
361 raw_args.append('ipengine')
390 raw_args.append('ipengine')
362 raw_args.append('-l')
391 raw_args.append('-l')
363 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
392 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
364 if args.mpi:
393 if args.mpi:
365 raw_args.append('--mpi=%s' % args.mpi)
394 raw_args.append('--mpi=%s' % args.mpi)
366 eset = ProcessLauncher(raw_args)
395 eset = ProcessLauncher(raw_args)
367 def shutdown(signum, frame):
396 def shutdown(signum, frame):
368 log.msg('Stopping local cluster')
397 log.msg('Stopping local cluster')
369 # We are still playing with the times here, but these seem
398 # We are still playing with the times here, but these seem
370 # to be reliable in allowing everything to exit cleanly.
399 # to be reliable in allowing everything to exit cleanly.
371 eset.interrupt_then_kill(1.0)
400 eset.interrupt_then_kill(1.0)
372 cl.interrupt_then_kill(1.0)
401 cl.interrupt_then_kill(1.0)
373 reactor.callLater(2.0, reactor.stop)
402 reactor.callLater(2.0, reactor.stop)
374 signal.signal(signal.SIGINT,shutdown)
403 signal.signal(signal.SIGINT,shutdown)
375 d = eset.start()
404 d = eset.start()
376 return d
405 return d
377 def delay_start(cont_pid):
406 def delay_start(cont_pid):
378 # This is needed because the controller doesn't start listening
407 # This is needed because the controller doesn't start listening
379 # right when it starts and the controller needs to write
408 # right when it starts and the controller needs to write
380 # furl files for the engine to pick up
409 # furl files for the engine to pick up
381 reactor.callLater(1.0, start_engines, cont_pid)
410 reactor.callLater(1.0, start_engines, cont_pid)
382 dstart.addCallback(delay_start)
411 dstart.addCallback(delay_start)
383 dstart.addErrback(lambda f: f.raiseException())
412 dstart.addErrback(lambda f: f.raiseException())
384
413
385 def main_pbs(args):
414 def main_pbs(args):
386 cont_args = []
415 cont_args = []
387 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
416 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
388 if args.x:
417
389 cont_args.append('-x')
418 # Check security settings before proceeding
390 if args.y:
419 keep_going = check_security(args, cont_args)
391 cont_args.append('-y')
420 if not keep_going: return
421
392 cl = ControllerLauncher(extra_args=cont_args)
422 cl = ControllerLauncher(extra_args=cont_args)
393 dstart = cl.start()
423 dstart = cl.start()
394 def start_engines(r):
424 def start_engines(r):
395 pbs_set = PBSEngineSet(args.pbsscript)
425 pbs_set = PBSEngineSet(args.pbsscript)
396 def shutdown(signum, frame):
426 def shutdown(signum, frame):
397 log.msg('Stopping pbs cluster')
427 log.msg('Stopping pbs cluster')
398 d = pbs_set.kill()
428 d = pbs_set.kill()
399 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
429 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
400 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
430 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
401 signal.signal(signal.SIGINT,shutdown)
431 signal.signal(signal.SIGINT,shutdown)
402 d = pbs_set.start(args.n)
432 d = pbs_set.start(args.n)
403 return d
433 return d
404 dstart.addCallback(start_engines)
434 dstart.addCallback(start_engines)
405 dstart.addErrback(lambda f: f.raiseException())
435 dstart.addErrback(lambda f: f.raiseException())
406
436
407
437
408 def get_args():
438 def get_args():
409 base_parser = argparse.ArgumentParser(add_help=False)
439 base_parser = argparse.ArgumentParser(add_help=False)
410 base_parser.add_argument(
440 base_parser.add_argument(
411 '-x',
441 '-x',
412 action='store_true',
442 action='store_true',
413 dest='x',
443 dest='x',
414 help='turn off client security'
444 help='turn off client security'
415 )
445 )
416 base_parser.add_argument(
446 base_parser.add_argument(
417 '-y',
447 '-y',
418 action='store_true',
448 action='store_true',
419 dest='y',
449 dest='y',
420 help='turn off engine security'
450 help='turn off engine security'
421 )
451 )
422 base_parser.add_argument(
452 base_parser.add_argument(
423 "--logdir",
453 "--logdir",
424 type=str,
454 type=str,
425 dest="logdir",
455 dest="logdir",
426 help="directory to put log files (default=$IPYTHONDIR/log)",
456 help="directory to put log files (default=$IPYTHONDIR/log)",
427 default=pjoin(get_ipython_dir(),'log')
457 default=pjoin(get_ipython_dir(),'log')
428 )
458 )
429 base_parser.add_argument(
459 base_parser.add_argument(
430 "-n",
460 "-n",
431 "--num",
461 "--num",
432 type=int,
462 type=int,
433 dest="n",
463 dest="n",
434 default=2,
464 default=2,
435 help="the number of engines to start"
465 help="the number of engines to start"
436 )
466 )
437
467
438 parser = argparse.ArgumentParser(
468 parser = argparse.ArgumentParser(
439 description='IPython cluster startup. This starts a controller and\
469 description='IPython cluster startup. This starts a controller and\
440 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
470 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
441 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
471 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
442 )
472 )
443 subparsers = parser.add_subparsers(
473 subparsers = parser.add_subparsers(
444 help='available cluster types. For help, do "ipcluster TYPE --help"')
474 help='available cluster types. For help, do "ipcluster TYPE --help"')
445
475
446 parser_local = subparsers.add_parser(
476 parser_local = subparsers.add_parser(
447 'local',
477 'local',
448 help='run a local cluster',
478 help='run a local cluster',
449 parents=[base_parser]
479 parents=[base_parser]
450 )
480 )
451 parser_local.set_defaults(func=main_local)
481 parser_local.set_defaults(func=main_local)
452
482
453 parser_mpirun = subparsers.add_parser(
483 parser_mpirun = subparsers.add_parser(
454 'mpirun',
484 'mpirun',
455 help='run a cluster using mpirun',
485 help='run a cluster using mpirun',
456 parents=[base_parser]
486 parents=[base_parser]
457 )
487 )
458 parser_mpirun.add_argument(
488 parser_mpirun.add_argument(
459 "--mpi",
489 "--mpi",
460 type=str,
490 type=str,
461 dest="mpi", # Don't put a default here to allow no MPI support
491 dest="mpi", # Don't put a default here to allow no MPI support
462 help="how to call MPI_Init (default=mpi4py)"
492 help="how to call MPI_Init (default=mpi4py)"
463 )
493 )
464 parser_mpirun.set_defaults(func=main_mpirun)
494 parser_mpirun.set_defaults(func=main_mpirun)
465
495
466 parser_pbs = subparsers.add_parser(
496 parser_pbs = subparsers.add_parser(
467 'pbs',
497 'pbs',
468 help='run a pbs cluster',
498 help='run a pbs cluster',
469 parents=[base_parser]
499 parents=[base_parser]
470 )
500 )
471 parser_pbs.add_argument(
501 parser_pbs.add_argument(
472 '--pbs-script',
502 '--pbs-script',
473 type=str,
503 type=str,
474 dest='pbsscript',
504 dest='pbsscript',
475 help='PBS script template',
505 help='PBS script template',
476 default='pbs.template'
506 default='pbs.template'
477 )
507 )
478 parser_pbs.set_defaults(func=main_pbs)
508 parser_pbs.set_defaults(func=main_pbs)
479 args = parser.parse_args()
509 args = parser.parse_args()
480 return args
510 return args
481
511
482 def main():
512 def main():
483 args = get_args()
513 args = get_args()
484 reactor.callWhenRunning(args.func, args)
514 reactor.callWhenRunning(args.func, args)
485 log.startLogging(sys.stdout)
515 log.startLogging(sys.stdout)
486 reactor.run()
516 reactor.run()
487
517
488 if __name__ == '__main__':
518 if __name__ == '__main__':
489 main()
519 main()
General Comments 0
You need to be logged in to leave comments. Login now