##// END OF EJS Templates
Fixing numerous bugs in ipcluster on Win32....
Administrator -
Show More
@@ -1,489 +1,519 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 pjoin = os.path.join
22 22
23 23 from twisted.internet import reactor, defer
24 24 from twisted.internet.protocol import ProcessProtocol
25 25 from twisted.python import failure, log
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28
29 29 from IPython.external import argparse
30 30 from IPython.external import Itpl
31 31 from IPython.kernel.twistedutil import gatherBoth
32 32 from IPython.kernel.util import printer
33 33 from IPython.genutils import get_ipython_dir, num_cpus
34 from IPython.kernel.fcutil import have_crypto
35 from IPython.kernel.error import SecurityError
34 36
35 37 #-----------------------------------------------------------------------------
36 38 # General process handling code
37 39 #-----------------------------------------------------------------------------
38 40
39 41 def find_exe(cmd):
40 42 try:
41 43 import win32api
42 44 except ImportError:
43 45 raise ImportError('you need to have pywin32 installed for this to work')
44 46 else:
45 47 try:
46 48 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
47 49 except:
48 50 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
49 51 return path
50 52
51 53 class ProcessStateError(Exception):
52 54 pass
53 55
54 56 class UnknownStatus(Exception):
55 57 pass
56 58
57 59 class LauncherProcessProtocol(ProcessProtocol):
58 60 """
59 61 A ProcessProtocol to go with the ProcessLauncher.
60 62 """
61 63 def __init__(self, process_launcher):
62 64 self.process_launcher = process_launcher
63 65
64 66 def connectionMade(self):
65 67 self.process_launcher.fire_start_deferred(self.transport.pid)
66 68
67 69 def processEnded(self, status):
68 70 value = status.value
69 71 if isinstance(value, ProcessDone):
70 72 self.process_launcher.fire_stop_deferred(0)
71 73 elif isinstance(value, ProcessTerminated):
72 74 self.process_launcher.fire_stop_deferred(
73 75 {'exit_code':value.exitCode,
74 76 'signal':value.signal,
75 77 'status':value.status
76 78 }
77 79 )
78 80 else:
79 81 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
80 82
81 83 def outReceived(self, data):
82 84 log.msg(data)
83 85
84 86 def errReceived(self, data):
85 87 log.err(data)
86 88
87 89 class ProcessLauncher(object):
88 90 """
89 91 Start and stop an external process in an asynchronous manner.
90 92
91 93 Currently this uses deferreds to notify other parties of process state
92 94 changes. This is an awkward design and should be moved to using
93 95 a formal NotificationCenter.
94 96 """
95 97 def __init__(self, cmd_and_args):
96 98 self.cmd = cmd_and_args[0]
97 99 self.args = cmd_and_args
98 100 self._reset()
99 101
100 102 def _reset(self):
101 103 self.process_protocol = None
102 104 self.pid = None
103 105 self.start_deferred = None
104 106 self.stop_deferreds = []
105 107 self.state = 'before' # before, running, or after
106 108
107 109 @property
108 110 def running(self):
109 111 if self.state == 'running':
110 112 return True
111 113 else:
112 114 return False
113 115
114 116 def fire_start_deferred(self, pid):
115 117 self.pid = pid
116 118 self.state = 'running'
117 119 log.msg('Process %r has started with pid=%i' % (self.args, pid))
118 120 self.start_deferred.callback(pid)
119 121
120 122 def start(self):
121 123 if self.state == 'before':
122 124 self.process_protocol = LauncherProcessProtocol(self)
123 125 self.start_deferred = defer.Deferred()
124 126 self.process_transport = reactor.spawnProcess(
125 127 self.process_protocol,
126 128 self.cmd,
127 129 self.args,
128 130 env=os.environ
129 131 )
130 132 return self.start_deferred
131 133 else:
132 134 s = 'the process has already been started and has state: %r' % \
133 135 self.state
134 136 return defer.fail(ProcessStateError(s))
135 137
136 138 def get_stop_deferred(self):
137 139 if self.state == 'running' or self.state == 'before':
138 140 d = defer.Deferred()
139 141 self.stop_deferreds.append(d)
140 142 return d
141 143 else:
142 144 s = 'this process is already complete'
143 145 return defer.fail(ProcessStateError(s))
144 146
145 147 def fire_stop_deferred(self, exit_code):
146 148 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
147 149 self.state = 'after'
148 150 for d in self.stop_deferreds:
149 151 d.callback(exit_code)
150 152
151 153 def signal(self, sig):
152 154 """
153 155 Send a signal to the process.
154 156
155 157 The argument sig can be ('KILL','INT', etc.) or any signal number.
156 158 """
157 159 if self.state == 'running':
158 160 self.process_transport.signalProcess(sig)
159 161
160 162 # def __del__(self):
161 163 # self.signal('KILL')
162 164
163 165 def interrupt_then_kill(self, delay=1.0):
164 166 self.signal('INT')
165 167 reactor.callLater(delay, self.signal, 'KILL')
166 168
167 169
168 170 #-----------------------------------------------------------------------------
169 171 # Code for launching controller and engines
170 172 #-----------------------------------------------------------------------------
171 173
172 174
173 175 class ControllerLauncher(ProcessLauncher):
174 176
175 177 def __init__(self, extra_args=None):
176 178 if sys.platform == 'win32':
177 args = [find_exe('ipcontroller')]
179 # This logic is needed because the ipcontroller script doesn't
180 # always get installed in the same way or in the same location.
181 from IPython.kernel.scripts import ipcontroller
182 script_location = ipcontroller.__file__.replace('.pyc', '.py')
183 # The -u option here turns on unbuffered output, which is required
184 # on Win32 to prevent wierd conflict and problems with Twisted
185 args = [find_exe('python'), '-u', script_location]
178 186 else:
179 187 args = ['ipcontroller']
180 188 self.extra_args = extra_args
181 189 if extra_args is not None:
182 190 args.extend(extra_args)
183 191
184 192 ProcessLauncher.__init__(self, args)
185 193
186 194
187 195 class EngineLauncher(ProcessLauncher):
188 196
189 197 def __init__(self, extra_args=None):
190 198 if sys.platform == 'win32':
191 args = [find_exe('ipengine')]
199 # This logic is needed because the ipcontroller script doesn't
200 # always get installed in the same way or in the same location.
201 from IPython.kernel.scripts import ipengine
202 script_location = ipengine.__file__.replace('.pyc', '.py')
203 # The -u option here turns on unbuffered output, which is required
204 # on Win32 to prevent wierd conflict and problems with Twisted
205 args = [find_exe('python'), '-u', script_location]
192 206 else:
193 207 args = ['ipengine']
194 208 self.extra_args = extra_args
195 209 if extra_args is not None:
196 210 args.extend(extra_args)
197 211
198 212 ProcessLauncher.__init__(self, args)
199 213
200 214
201 215 class LocalEngineSet(object):
202 216
203 217 def __init__(self, extra_args=None):
204 218 self.extra_args = extra_args
205 219 self.launchers = []
206 220
207 221 def start(self, n):
208 222 dlist = []
209 223 for i in range(n):
210 224 el = EngineLauncher(extra_args=self.extra_args)
211 225 d = el.start()
212 226 self.launchers.append(el)
213 227 dlist.append(d)
214 228 dfinal = gatherBoth(dlist, consumeErrors=True)
215 229 dfinal.addCallback(self._handle_start)
216 230 return dfinal
217 231
218 232 def _handle_start(self, r):
219 233 log.msg('Engines started with pids: %r' % r)
220 234 return r
221 235
222 236 def _handle_stop(self, r):
223 237 log.msg('Engines received signal: %r' % r)
224 238 return r
225 239
226 240 def signal(self, sig):
227 241 dlist = []
228 242 for el in self.launchers:
229 243 d = el.get_stop_deferred()
230 244 dlist.append(d)
231 245 el.signal(sig)
232 246 dfinal = gatherBoth(dlist, consumeErrors=True)
233 247 dfinal.addCallback(self._handle_stop)
234 248 return dfinal
235 249
236 250 def interrupt_then_kill(self, delay=1.0):
237 251 dlist = []
238 252 for el in self.launchers:
239 253 d = el.get_stop_deferred()
240 254 dlist.append(d)
241 255 el.interrupt_then_kill(delay)
242 256 dfinal = gatherBoth(dlist, consumeErrors=True)
243 257 dfinal.addCallback(self._handle_stop)
244 258 return dfinal
245 259
246 260
247 261 class BatchEngineSet(object):
248 262
249 263 # Subclasses must fill these in. See PBSEngineSet
250 264 submit_command = ''
251 265 delete_command = ''
252 266 job_id_regexp = ''
253 267
254 268 def __init__(self, template_file, **kwargs):
255 269 self.template_file = template_file
256 270 self.context = {}
257 271 self.context.update(kwargs)
258 272 self.batch_file = self.template_file+'-run'
259 273
260 274 def parse_job_id(self, output):
261 275 m = re.match(self.job_id_regexp, output)
262 276 if m is not None:
263 277 job_id = m.group()
264 278 else:
265 279 raise Exception("job id couldn't be determined: %s" % output)
266 280 self.job_id = job_id
267 281 log.msg('Job started with job id: %r' % job_id)
268 282 return job_id
269 283
270 284 def write_batch_script(self, n):
271 285 self.context['n'] = n
272 286 template = open(self.template_file, 'r').read()
273 287 log.msg('Using template for batch script: %s' % self.template_file)
274 288 script_as_string = Itpl.itplns(template, self.context)
275 289 log.msg('Writing instantiated batch script: %s' % self.batch_file)
276 290 f = open(self.batch_file,'w')
277 291 f.write(script_as_string)
278 292 f.close()
279 293
280 294 def handle_error(self, f):
281 295 f.printTraceback()
282 296 f.raiseException()
283 297
284 298 def start(self, n):
285 299 self.write_batch_script(n)
286 300 d = getProcessOutput(self.submit_command,
287 301 [self.batch_file],env=os.environ)
288 302 d.addCallback(self.parse_job_id)
289 303 d.addErrback(self.handle_error)
290 304 return d
291 305
292 306 def kill(self):
293 307 d = getProcessOutput(self.delete_command,
294 308 [self.job_id],env=os.environ)
295 309 return d
296 310
297 311 class PBSEngineSet(BatchEngineSet):
298 312
299 313 submit_command = 'qsub'
300 314 delete_command = 'qdel'
301 315 job_id_regexp = '\d+'
302 316
303 317 def __init__(self, template_file, **kwargs):
304 318 BatchEngineSet.__init__(self, template_file, **kwargs)
305 319
306 320
307 321 #-----------------------------------------------------------------------------
308 322 # Main functions for the different types of clusters
309 323 #-----------------------------------------------------------------------------
310 324
311 325 # TODO:
312 326 # The logic in these codes should be moved into classes like LocalCluster
313 327 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
314 328 # The main functions should then just parse the command line arguments, create
315 329 # the appropriate class and call a 'start' method.
316 330
317 def main_local(args):
318 cont_args = []
319 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
331 def check_security(args, cont_args):
332 if (not args.x or not args.y) and not have_crypto:
333 log.err("""
334 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
335 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
336 reactor.stop()
337 return False
320 338 if args.x:
321 339 cont_args.append('-x')
322 340 if args.y:
323 341 cont_args.append('-y')
342 return True
343
344 def main_local(args):
345 cont_args = []
346 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
347
348 # Check security settings before proceeding
349 keep_going = check_security(args, cont_args)
350 if not keep_going: return
351
324 352 cl = ControllerLauncher(extra_args=cont_args)
325 353 dstart = cl.start()
326 354 def start_engines(cont_pid):
327 355 engine_args = []
328 356 engine_args.append('--logfile=%s' % \
329 357 pjoin(args.logdir,'ipengine%s-' % cont_pid))
330 358 eset = LocalEngineSet(extra_args=engine_args)
331 359 def shutdown(signum, frame):
332 360 log.msg('Stopping local cluster')
333 361 # We are still playing with the times here, but these seem
334 362 # to be reliable in allowing everything to exit cleanly.
335 363 eset.interrupt_then_kill(0.5)
336 364 cl.interrupt_then_kill(0.5)
337 365 reactor.callLater(1.0, reactor.stop)
338 366 signal.signal(signal.SIGINT,shutdown)
339 367 d = eset.start(args.n)
340 368 return d
341 369 def delay_start(cont_pid):
342 370 # This is needed because the controller doesn't start listening
343 371 # right when it starts and the controller needs to write
344 372 # furl files for the engine to pick up
345 373 reactor.callLater(1.0, start_engines, cont_pid)
346 374 dstart.addCallback(delay_start)
347 375 dstart.addErrback(lambda f: f.raiseException())
348 376
349 377 def main_mpirun(args):
350 378 cont_args = []
351 379 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
352 if args.x:
353 cont_args.append('-x')
354 if args.y:
355 cont_args.append('-y')
380
381 # Check security settings before proceeding
382 keep_going = check_security(args, cont_args)
383 if not keep_going: return
384
356 385 cl = ControllerLauncher(extra_args=cont_args)
357 386 dstart = cl.start()
358 387 def start_engines(cont_pid):
359 388 raw_args = ['mpirun']
360 389 raw_args.extend(['-n',str(args.n)])
361 390 raw_args.append('ipengine')
362 391 raw_args.append('-l')
363 392 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
364 393 if args.mpi:
365 394 raw_args.append('--mpi=%s' % args.mpi)
366 395 eset = ProcessLauncher(raw_args)
367 396 def shutdown(signum, frame):
368 397 log.msg('Stopping local cluster')
369 398 # We are still playing with the times here, but these seem
370 399 # to be reliable in allowing everything to exit cleanly.
371 400 eset.interrupt_then_kill(1.0)
372 401 cl.interrupt_then_kill(1.0)
373 402 reactor.callLater(2.0, reactor.stop)
374 403 signal.signal(signal.SIGINT,shutdown)
375 404 d = eset.start()
376 405 return d
377 406 def delay_start(cont_pid):
378 407 # This is needed because the controller doesn't start listening
379 408 # right when it starts and the controller needs to write
380 409 # furl files for the engine to pick up
381 410 reactor.callLater(1.0, start_engines, cont_pid)
382 411 dstart.addCallback(delay_start)
383 412 dstart.addErrback(lambda f: f.raiseException())
384 413
385 414 def main_pbs(args):
386 415 cont_args = []
387 416 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
388 if args.x:
389 cont_args.append('-x')
390 if args.y:
391 cont_args.append('-y')
417
418 # Check security settings before proceeding
419 keep_going = check_security(args, cont_args)
420 if not keep_going: return
421
392 422 cl = ControllerLauncher(extra_args=cont_args)
393 423 dstart = cl.start()
394 424 def start_engines(r):
395 425 pbs_set = PBSEngineSet(args.pbsscript)
396 426 def shutdown(signum, frame):
397 427 log.msg('Stopping pbs cluster')
398 428 d = pbs_set.kill()
399 429 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
400 430 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
401 431 signal.signal(signal.SIGINT,shutdown)
402 432 d = pbs_set.start(args.n)
403 433 return d
404 434 dstart.addCallback(start_engines)
405 435 dstart.addErrback(lambda f: f.raiseException())
406 436
407 437
408 438 def get_args():
409 439 base_parser = argparse.ArgumentParser(add_help=False)
410 440 base_parser.add_argument(
411 441 '-x',
412 442 action='store_true',
413 443 dest='x',
414 444 help='turn off client security'
415 445 )
416 446 base_parser.add_argument(
417 447 '-y',
418 448 action='store_true',
419 449 dest='y',
420 450 help='turn off engine security'
421 451 )
422 452 base_parser.add_argument(
423 453 "--logdir",
424 454 type=str,
425 455 dest="logdir",
426 456 help="directory to put log files (default=$IPYTHONDIR/log)",
427 457 default=pjoin(get_ipython_dir(),'log')
428 458 )
429 459 base_parser.add_argument(
430 460 "-n",
431 461 "--num",
432 462 type=int,
433 463 dest="n",
434 464 default=2,
435 465 help="the number of engines to start"
436 466 )
437 467
438 468 parser = argparse.ArgumentParser(
439 469 description='IPython cluster startup. This starts a controller and\
440 470 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
441 471 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
442 472 )
443 473 subparsers = parser.add_subparsers(
444 474 help='available cluster types. For help, do "ipcluster TYPE --help"')
445 475
446 476 parser_local = subparsers.add_parser(
447 477 'local',
448 478 help='run a local cluster',
449 479 parents=[base_parser]
450 480 )
451 481 parser_local.set_defaults(func=main_local)
452 482
453 483 parser_mpirun = subparsers.add_parser(
454 484 'mpirun',
455 485 help='run a cluster using mpirun',
456 486 parents=[base_parser]
457 487 )
458 488 parser_mpirun.add_argument(
459 489 "--mpi",
460 490 type=str,
461 491 dest="mpi", # Don't put a default here to allow no MPI support
462 492 help="how to call MPI_Init (default=mpi4py)"
463 493 )
464 494 parser_mpirun.set_defaults(func=main_mpirun)
465 495
466 496 parser_pbs = subparsers.add_parser(
467 497 'pbs',
468 498 help='run a pbs cluster',
469 499 parents=[base_parser]
470 500 )
471 501 parser_pbs.add_argument(
472 502 '--pbs-script',
473 503 type=str,
474 504 dest='pbsscript',
475 505 help='PBS script template',
476 506 default='pbs.template'
477 507 )
478 508 parser_pbs.set_defaults(func=main_pbs)
479 509 args = parser.parse_args()
480 510 return args
481 511
482 512 def main():
483 513 args = get_args()
484 514 reactor.callWhenRunning(args.func, args)
485 515 log.startLogging(sys.stdout)
486 516 reactor.run()
487 517
488 518 if __name__ == '__main__':
489 519 main()
General Comments 0
You need to be logged in to leave comments. Login now