##// END OF EJS Templates
Fixing small things in response to review.
Brian Granger -
Show More
1 NO CONTENT: modified file
@@ -1,519 +1,521
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 pjoin = os.path.join
22 22
23 23 from twisted.internet import reactor, defer
24 24 from twisted.internet.protocol import ProcessProtocol
25 from twisted.python import failure, log
26 25 from twisted.internet.error import ProcessDone, ProcessTerminated
27 26 from twisted.internet.utils import getProcessOutput
27 from twisted.python import failure, log
28 28
29 29 from IPython.external import argparse
30 30 from IPython.external import Itpl
31 from IPython.kernel.twistedutil import gatherBoth
32 from IPython.kernel.util import printer
33 31 from IPython.genutils import get_ipython_dir, num_cpus
34 32 from IPython.kernel.fcutil import have_crypto
35 33 from IPython.kernel.error import SecurityError
34 from IPython.kernel.fcutil import have_crypto
35 from IPython.kernel.twistedutil import gatherBoth
36 from IPython.kernel.util import printer
37
36 38
37 39 #-----------------------------------------------------------------------------
38 40 # General process handling code
39 41 #-----------------------------------------------------------------------------
40 42
41 43 def find_exe(cmd):
42 44 try:
43 45 import win32api
44 46 except ImportError:
45 47 raise ImportError('you need to have pywin32 installed for this to work')
46 48 else:
47 49 try:
48 50 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
49 51 except:
50 52 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
51 53 return path
52 54
53 55 class ProcessStateError(Exception):
54 56 pass
55 57
56 58 class UnknownStatus(Exception):
57 59 pass
58 60
59 61 class LauncherProcessProtocol(ProcessProtocol):
60 62 """
61 63 A ProcessProtocol to go with the ProcessLauncher.
62 64 """
63 65 def __init__(self, process_launcher):
64 66 self.process_launcher = process_launcher
65 67
66 68 def connectionMade(self):
67 69 self.process_launcher.fire_start_deferred(self.transport.pid)
68 70
69 71 def processEnded(self, status):
70 72 value = status.value
71 73 if isinstance(value, ProcessDone):
72 74 self.process_launcher.fire_stop_deferred(0)
73 75 elif isinstance(value, ProcessTerminated):
74 76 self.process_launcher.fire_stop_deferred(
75 77 {'exit_code':value.exitCode,
76 78 'signal':value.signal,
77 79 'status':value.status
78 80 }
79 81 )
80 82 else:
81 83 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
82 84
83 85 def outReceived(self, data):
84 86 log.msg(data)
85 87
86 88 def errReceived(self, data):
87 89 log.err(data)
88 90
89 91 class ProcessLauncher(object):
90 92 """
91 93 Start and stop an external process in an asynchronous manner.
92 94
93 95 Currently this uses deferreds to notify other parties of process state
94 96 changes. This is an awkward design and should be moved to using
95 97 a formal NotificationCenter.
96 98 """
97 99 def __init__(self, cmd_and_args):
98 100 self.cmd = cmd_and_args[0]
99 101 self.args = cmd_and_args
100 102 self._reset()
101 103
102 104 def _reset(self):
103 105 self.process_protocol = None
104 106 self.pid = None
105 107 self.start_deferred = None
106 108 self.stop_deferreds = []
107 109 self.state = 'before' # before, running, or after
108 110
109 111 @property
110 112 def running(self):
111 113 if self.state == 'running':
112 114 return True
113 115 else:
114 116 return False
115 117
116 118 def fire_start_deferred(self, pid):
117 119 self.pid = pid
118 120 self.state = 'running'
119 121 log.msg('Process %r has started with pid=%i' % (self.args, pid))
120 122 self.start_deferred.callback(pid)
121 123
122 124 def start(self):
123 125 if self.state == 'before':
124 126 self.process_protocol = LauncherProcessProtocol(self)
125 127 self.start_deferred = defer.Deferred()
126 128 self.process_transport = reactor.spawnProcess(
127 129 self.process_protocol,
128 130 self.cmd,
129 131 self.args,
130 132 env=os.environ
131 133 )
132 134 return self.start_deferred
133 135 else:
134 136 s = 'the process has already been started and has state: %r' % \
135 137 self.state
136 138 return defer.fail(ProcessStateError(s))
137 139
138 140 def get_stop_deferred(self):
139 141 if self.state == 'running' or self.state == 'before':
140 142 d = defer.Deferred()
141 143 self.stop_deferreds.append(d)
142 144 return d
143 145 else:
144 146 s = 'this process is already complete'
145 147 return defer.fail(ProcessStateError(s))
146 148
147 149 def fire_stop_deferred(self, exit_code):
148 150 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
149 151 self.state = 'after'
150 152 for d in self.stop_deferreds:
151 153 d.callback(exit_code)
152 154
153 155 def signal(self, sig):
154 156 """
155 157 Send a signal to the process.
156 158
157 159 The argument sig can be ('KILL','INT', etc.) or any signal number.
158 160 """
159 161 if self.state == 'running':
160 162 self.process_transport.signalProcess(sig)
161 163
162 164 # def __del__(self):
163 165 # self.signal('KILL')
164 166
165 167 def interrupt_then_kill(self, delay=1.0):
166 168 self.signal('INT')
167 169 reactor.callLater(delay, self.signal, 'KILL')
168 170
169 171
170 172 #-----------------------------------------------------------------------------
171 173 # Code for launching controller and engines
172 174 #-----------------------------------------------------------------------------
173 175
174 176
175 177 class ControllerLauncher(ProcessLauncher):
176 178
177 179 def __init__(self, extra_args=None):
178 180 if sys.platform == 'win32':
179 181 # This logic is needed because the ipcontroller script doesn't
180 182 # always get installed in the same way or in the same location.
181 183 from IPython.kernel.scripts import ipcontroller
182 184 script_location = ipcontroller.__file__.replace('.pyc', '.py')
183 185 # The -u option here turns on unbuffered output, which is required
184 186 # on Win32 to prevent wierd conflict and problems with Twisted
185 187 args = [find_exe('python'), '-u', script_location]
186 188 else:
187 189 args = ['ipcontroller']
188 190 self.extra_args = extra_args
189 191 if extra_args is not None:
190 192 args.extend(extra_args)
191 193
192 194 ProcessLauncher.__init__(self, args)
193 195
194 196
195 197 class EngineLauncher(ProcessLauncher):
196 198
197 199 def __init__(self, extra_args=None):
198 200 if sys.platform == 'win32':
199 201 # This logic is needed because the ipcontroller script doesn't
200 202 # always get installed in the same way or in the same location.
201 203 from IPython.kernel.scripts import ipengine
202 204 script_location = ipengine.__file__.replace('.pyc', '.py')
203 205 # The -u option here turns on unbuffered output, which is required
204 206 # on Win32 to prevent wierd conflict and problems with Twisted
205 207 args = [find_exe('python'), '-u', script_location]
206 208 else:
207 209 args = ['ipengine']
208 210 self.extra_args = extra_args
209 211 if extra_args is not None:
210 212 args.extend(extra_args)
211 213
212 214 ProcessLauncher.__init__(self, args)
213 215
214 216
215 217 class LocalEngineSet(object):
216 218
217 219 def __init__(self, extra_args=None):
218 220 self.extra_args = extra_args
219 221 self.launchers = []
220 222
221 223 def start(self, n):
222 224 dlist = []
223 225 for i in range(n):
224 226 el = EngineLauncher(extra_args=self.extra_args)
225 227 d = el.start()
226 228 self.launchers.append(el)
227 229 dlist.append(d)
228 230 dfinal = gatherBoth(dlist, consumeErrors=True)
229 231 dfinal.addCallback(self._handle_start)
230 232 return dfinal
231 233
232 234 def _handle_start(self, r):
233 235 log.msg('Engines started with pids: %r' % r)
234 236 return r
235 237
236 238 def _handle_stop(self, r):
237 239 log.msg('Engines received signal: %r' % r)
238 240 return r
239 241
240 242 def signal(self, sig):
241 243 dlist = []
242 244 for el in self.launchers:
243 245 d = el.get_stop_deferred()
244 246 dlist.append(d)
245 247 el.signal(sig)
246 248 dfinal = gatherBoth(dlist, consumeErrors=True)
247 249 dfinal.addCallback(self._handle_stop)
248 250 return dfinal
249 251
250 252 def interrupt_then_kill(self, delay=1.0):
251 253 dlist = []
252 254 for el in self.launchers:
253 255 d = el.get_stop_deferred()
254 256 dlist.append(d)
255 257 el.interrupt_then_kill(delay)
256 258 dfinal = gatherBoth(dlist, consumeErrors=True)
257 259 dfinal.addCallback(self._handle_stop)
258 260 return dfinal
259 261
260 262
261 263 class BatchEngineSet(object):
262 264
263 265 # Subclasses must fill these in. See PBSEngineSet
264 266 submit_command = ''
265 267 delete_command = ''
266 268 job_id_regexp = ''
267 269
268 270 def __init__(self, template_file, **kwargs):
269 271 self.template_file = template_file
270 272 self.context = {}
271 273 self.context.update(kwargs)
272 274 self.batch_file = self.template_file+'-run'
273 275
274 276 def parse_job_id(self, output):
275 277 m = re.match(self.job_id_regexp, output)
276 278 if m is not None:
277 279 job_id = m.group()
278 280 else:
279 281 raise Exception("job id couldn't be determined: %s" % output)
280 282 self.job_id = job_id
281 283 log.msg('Job started with job id: %r' % job_id)
282 284 return job_id
283 285
284 286 def write_batch_script(self, n):
285 287 self.context['n'] = n
286 288 template = open(self.template_file, 'r').read()
287 289 log.msg('Using template for batch script: %s' % self.template_file)
288 290 script_as_string = Itpl.itplns(template, self.context)
289 291 log.msg('Writing instantiated batch script: %s' % self.batch_file)
290 292 f = open(self.batch_file,'w')
291 293 f.write(script_as_string)
292 294 f.close()
293 295
294 296 def handle_error(self, f):
295 297 f.printTraceback()
296 298 f.raiseException()
297 299
298 300 def start(self, n):
299 301 self.write_batch_script(n)
300 302 d = getProcessOutput(self.submit_command,
301 303 [self.batch_file],env=os.environ)
302 304 d.addCallback(self.parse_job_id)
303 305 d.addErrback(self.handle_error)
304 306 return d
305 307
306 308 def kill(self):
307 309 d = getProcessOutput(self.delete_command,
308 310 [self.job_id],env=os.environ)
309 311 return d
310 312
311 313 class PBSEngineSet(BatchEngineSet):
312 314
313 315 submit_command = 'qsub'
314 316 delete_command = 'qdel'
315 317 job_id_regexp = '\d+'
316 318
317 319 def __init__(self, template_file, **kwargs):
318 320 BatchEngineSet.__init__(self, template_file, **kwargs)
319 321
320 322
321 323 #-----------------------------------------------------------------------------
322 324 # Main functions for the different types of clusters
323 325 #-----------------------------------------------------------------------------
324 326
325 327 # TODO:
326 328 # The logic in these codes should be moved into classes like LocalCluster
327 329 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
328 330 # The main functions should then just parse the command line arguments, create
329 331 # the appropriate class and call a 'start' method.
330 332
331 333 def check_security(args, cont_args):
332 334 if (not args.x or not args.y) and not have_crypto:
333 335 log.err("""
334 336 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
335 337 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
336 338 reactor.stop()
337 339 return False
338 340 if args.x:
339 341 cont_args.append('-x')
340 342 if args.y:
341 343 cont_args.append('-y')
342 344 return True
343 345
344 346 def main_local(args):
345 347 cont_args = []
346 348 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
347 349
348 350 # Check security settings before proceeding
349 keep_going = check_security(args, cont_args)
350 if not keep_going: return
351 if not check_security(args, cont_args):
352 return
351 353
352 354 cl = ControllerLauncher(extra_args=cont_args)
353 355 dstart = cl.start()
354 356 def start_engines(cont_pid):
355 357 engine_args = []
356 358 engine_args.append('--logfile=%s' % \
357 359 pjoin(args.logdir,'ipengine%s-' % cont_pid))
358 360 eset = LocalEngineSet(extra_args=engine_args)
359 361 def shutdown(signum, frame):
360 362 log.msg('Stopping local cluster')
361 363 # We are still playing with the times here, but these seem
362 364 # to be reliable in allowing everything to exit cleanly.
363 365 eset.interrupt_then_kill(0.5)
364 366 cl.interrupt_then_kill(0.5)
365 367 reactor.callLater(1.0, reactor.stop)
366 368 signal.signal(signal.SIGINT,shutdown)
367 369 d = eset.start(args.n)
368 370 return d
369 371 def delay_start(cont_pid):
370 372 # This is needed because the controller doesn't start listening
371 373 # right when it starts and the controller needs to write
372 374 # furl files for the engine to pick up
373 375 reactor.callLater(1.0, start_engines, cont_pid)
374 376 dstart.addCallback(delay_start)
375 377 dstart.addErrback(lambda f: f.raiseException())
376 378
377 379 def main_mpirun(args):
378 380 cont_args = []
379 381 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
380 382
381 383 # Check security settings before proceeding
382 keep_going = check_security(args, cont_args)
383 if not keep_going: return
384 if not check_security(args, cont_args):
385 return
384 386
385 387 cl = ControllerLauncher(extra_args=cont_args)
386 388 dstart = cl.start()
387 389 def start_engines(cont_pid):
388 390 raw_args = ['mpirun']
389 391 raw_args.extend(['-n',str(args.n)])
390 392 raw_args.append('ipengine')
391 393 raw_args.append('-l')
392 394 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
393 395 if args.mpi:
394 396 raw_args.append('--mpi=%s' % args.mpi)
395 397 eset = ProcessLauncher(raw_args)
396 398 def shutdown(signum, frame):
397 399 log.msg('Stopping local cluster')
398 400 # We are still playing with the times here, but these seem
399 401 # to be reliable in allowing everything to exit cleanly.
400 402 eset.interrupt_then_kill(1.0)
401 403 cl.interrupt_then_kill(1.0)
402 404 reactor.callLater(2.0, reactor.stop)
403 405 signal.signal(signal.SIGINT,shutdown)
404 406 d = eset.start()
405 407 return d
406 408 def delay_start(cont_pid):
407 409 # This is needed because the controller doesn't start listening
408 410 # right when it starts and the controller needs to write
409 411 # furl files for the engine to pick up
410 412 reactor.callLater(1.0, start_engines, cont_pid)
411 413 dstart.addCallback(delay_start)
412 414 dstart.addErrback(lambda f: f.raiseException())
413 415
414 416 def main_pbs(args):
415 417 cont_args = []
416 418 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
417 419
418 420 # Check security settings before proceeding
419 keep_going = check_security(args, cont_args)
420 if not keep_going: return
421 if not check_security(args, cont_args):
422 return
421 423
422 424 cl = ControllerLauncher(extra_args=cont_args)
423 425 dstart = cl.start()
424 426 def start_engines(r):
425 427 pbs_set = PBSEngineSet(args.pbsscript)
426 428 def shutdown(signum, frame):
427 429 log.msg('Stopping pbs cluster')
428 430 d = pbs_set.kill()
429 431 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
430 432 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
431 433 signal.signal(signal.SIGINT,shutdown)
432 434 d = pbs_set.start(args.n)
433 435 return d
434 436 dstart.addCallback(start_engines)
435 437 dstart.addErrback(lambda f: f.raiseException())
436 438
437 439
438 440 def get_args():
439 441 base_parser = argparse.ArgumentParser(add_help=False)
440 442 base_parser.add_argument(
441 443 '-x',
442 444 action='store_true',
443 445 dest='x',
444 446 help='turn off client security'
445 447 )
446 448 base_parser.add_argument(
447 449 '-y',
448 450 action='store_true',
449 451 dest='y',
450 452 help='turn off engine security'
451 453 )
452 454 base_parser.add_argument(
453 455 "--logdir",
454 456 type=str,
455 457 dest="logdir",
456 458 help="directory to put log files (default=$IPYTHONDIR/log)",
457 459 default=pjoin(get_ipython_dir(),'log')
458 460 )
459 461 base_parser.add_argument(
460 462 "-n",
461 463 "--num",
462 464 type=int,
463 465 dest="n",
464 466 default=2,
465 467 help="the number of engines to start"
466 468 )
467 469
468 470 parser = argparse.ArgumentParser(
469 471 description='IPython cluster startup. This starts a controller and\
470 472 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
471 473 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
472 474 )
473 475 subparsers = parser.add_subparsers(
474 476 help='available cluster types. For help, do "ipcluster TYPE --help"')
475 477
476 478 parser_local = subparsers.add_parser(
477 479 'local',
478 480 help='run a local cluster',
479 481 parents=[base_parser]
480 482 )
481 483 parser_local.set_defaults(func=main_local)
482 484
483 485 parser_mpirun = subparsers.add_parser(
484 486 'mpirun',
485 487 help='run a cluster using mpirun',
486 488 parents=[base_parser]
487 489 )
488 490 parser_mpirun.add_argument(
489 491 "--mpi",
490 492 type=str,
491 493 dest="mpi", # Don't put a default here to allow no MPI support
492 494 help="how to call MPI_Init (default=mpi4py)"
493 495 )
494 496 parser_mpirun.set_defaults(func=main_mpirun)
495 497
496 498 parser_pbs = subparsers.add_parser(
497 499 'pbs',
498 500 help='run a pbs cluster',
499 501 parents=[base_parser]
500 502 )
501 503 parser_pbs.add_argument(
502 504 '--pbs-script',
503 505 type=str,
504 506 dest='pbsscript',
505 507 help='PBS script template',
506 508 default='pbs.template'
507 509 )
508 510 parser_pbs.set_defaults(func=main_pbs)
509 511 args = parser.parse_args()
510 512 return args
511 513
512 514 def main():
513 515 args = get_args()
514 516 reactor.callWhenRunning(args.func, args)
515 517 log.startLogging(sys.stdout)
516 518 reactor.run()
517 519
518 520 if __name__ == '__main__':
519 521 main()
General Comments 0
You need to be logged in to leave comments. Login now