##// END OF EJS Templates
More changes to the PBS batch cluster.
Brian Granger -
Show More
@@ -1,451 +1,455 b''
1 1 import os
2 2 import re
3 3 import sys
4 4 import signal
5 5 pjoin = os.path.join
6 6
7 7 from twisted.internet import reactor, defer
8 8 from twisted.internet.protocol import ProcessProtocol
9 9 from twisted.python import failure, log
10 10 from twisted.internet.error import ProcessDone, ProcessTerminated
11 11 from twisted.internet.utils import getProcessOutput
12 12
13 13 from IPython.external import argparse
14 14 from IPython.external import Itpl
15 15 from IPython.kernel.twistedutil import gatherBoth
16 16 from IPython.kernel.util import printer
17 17 from IPython.genutils import get_ipython_dir, num_cpus
18 18
19 19 def find_exe(cmd):
20 20 try:
21 21 import win32api
22 22 except ImportError:
23 23 raise ImportError('you need to have pywin32 installed for this to work')
24 24 else:
25 25 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd)
26 26 return path
27 27
28 28 # Test local cluster on Win32
29 29 # Look at local cluster usage strings
30 30 # PBS stuff
31 31
32 32 class ProcessStateError(Exception):
33 33 pass
34 34
35 35 class UnknownStatus(Exception):
36 36 pass
37 37
38 38 class LauncherProcessProtocol(ProcessProtocol):
39 39 """
40 40 A ProcessProtocol to go with the ProcessLauncher.
41 41 """
42 42 def __init__(self, process_launcher):
43 43 self.process_launcher = process_launcher
44 44
45 45 def connectionMade(self):
46 46 self.process_launcher.fire_start_deferred(self.transport.pid)
47 47
48 48 def processEnded(self, status):
49 49 value = status.value
50 50 if isinstance(value, ProcessDone):
51 51 self.process_launcher.fire_stop_deferred(0)
52 52 elif isinstance(value, ProcessTerminated):
53 53 self.process_launcher.fire_stop_deferred(
54 54 {'exit_code':value.exitCode,
55 55 'signal':value.signal,
56 56 'status':value.status
57 57 }
58 58 )
59 59 else:
60 60 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
61 61
62 62 def outReceived(self, data):
63 63 log.msg(data)
64 64
65 65 def errReceived(self, data):
66 66 log.err(data)
67 67
68 68 class ProcessLauncher(object):
69 69 """
70 70 Start and stop an external process in an asynchronous manner.
71 71
72 72 Currently this uses deferreds to notify other parties of process state
73 73 changes. This is an awkward design and should be moved to using
74 74 a formal NotificationCenter.
75 75 """
76 76 def __init__(self, cmd_and_args):
77 77 self.cmd = cmd_and_args[0]
78 78 self.args = cmd_and_args
79 79 self._reset()
80 80
81 81 def _reset(self):
82 82 self.process_protocol = None
83 83 self.pid = None
84 84 self.start_deferred = None
85 85 self.stop_deferreds = []
86 86 self.state = 'before' # before, running, or after
87 87
88 88 @property
89 89 def running(self):
90 90 if self.state == 'running':
91 91 return True
92 92 else:
93 93 return False
94 94
95 95 def fire_start_deferred(self, pid):
96 96 self.pid = pid
97 97 self.state = 'running'
98 98 log.msg('Process %r has started with pid=%i' % (self.args, pid))
99 99 self.start_deferred.callback(pid)
100 100
101 101 def start(self):
102 102 if self.state == 'before':
103 103 self.process_protocol = LauncherProcessProtocol(self)
104 104 self.start_deferred = defer.Deferred()
105 105 self.process_transport = reactor.spawnProcess(
106 106 self.process_protocol,
107 107 self.cmd,
108 108 self.args,
109 109 env=os.environ
110 110 )
111 111 return self.start_deferred
112 112 else:
113 113 s = 'the process has already been started and has state: %r' % \
114 114 self.state
115 115 return defer.fail(ProcessStateError(s))
116 116
117 117 def get_stop_deferred(self):
118 118 if self.state == 'running' or self.state == 'before':
119 119 d = defer.Deferred()
120 120 self.stop_deferreds.append(d)
121 121 return d
122 122 else:
123 123 s = 'this process is already complete'
124 124 return defer.fail(ProcessStateError(s))
125 125
126 126 def fire_stop_deferred(self, exit_code):
127 127 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
128 128 self.state = 'after'
129 129 for d in self.stop_deferreds:
130 130 d.callback(exit_code)
131 131
132 132 def signal(self, sig):
133 133 """
134 134 Send a signal to the process.
135 135
136 136 The argument sig can be ('KILL','INT', etc.) or any signal number.
137 137 """
138 138 if self.state == 'running':
139 139 self.process_transport.signalProcess(sig)
140 140
141 141 # def __del__(self):
142 142 # self.signal('KILL')
143 143
144 144 def interrupt_then_kill(self, delay=1.0):
145 145 self.signal('INT')
146 146 reactor.callLater(delay, self.signal, 'KILL')
147 147
148 148
149 149 class ControllerLauncher(ProcessLauncher):
150 150
151 151 def __init__(self, extra_args=None):
152 152 if sys.platform == 'win32':
153 153 args = [find_exe('ipcontroller.bat')]
154 154 else:
155 155 args = ['ipcontroller']
156 156 self.extra_args = extra_args
157 157 if extra_args is not None:
158 158 args.extend(extra_args)
159 159
160 160 ProcessLauncher.__init__(self, args)
161 161
162 162
163 163 class EngineLauncher(ProcessLauncher):
164 164
165 165 def __init__(self, extra_args=None):
166 166 if sys.platform == 'win32':
167 167 args = [find_exe('ipengine.bat')]
168 168 else:
169 169 args = ['ipengine']
170 170 self.extra_args = extra_args
171 171 if extra_args is not None:
172 172 args.extend(extra_args)
173 173
174 174 ProcessLauncher.__init__(self, args)
175 175
176 176
177 177 class LocalEngineSet(object):
178 178
179 179 def __init__(self, extra_args=None):
180 180 self.extra_args = extra_args
181 181 self.launchers = []
182 182
183 183 def start(self, n):
184 184 dlist = []
185 185 for i in range(n):
186 186 el = EngineLauncher(extra_args=self.extra_args)
187 187 d = el.start()
188 188 self.launchers.append(el)
189 189 dlist.append(d)
190 190 dfinal = gatherBoth(dlist, consumeErrors=True)
191 191 dfinal.addCallback(self._handle_start)
192 192 return dfinal
193 193
194 194 def _handle_start(self, r):
195 195 log.msg('Engines started with pids: %r' % r)
196 196 return r
197 197
198 198 def _handle_stop(self, r):
199 199 log.msg('Engines received signal: %r' % r)
200 200 return r
201 201
202 202 def signal(self, sig):
203 203 dlist = []
204 204 for el in self.launchers:
205 205 d = el.get_stop_deferred()
206 206 dlist.append(d)
207 207 el.signal(sig)
208 208 dfinal = gatherBoth(dlist, consumeErrors=True)
209 209 dfinal.addCallback(self._handle_stop)
210 210 return dfinal
211 211
212 212 def interrupt_then_kill(self, delay=1.0):
213 213 dlist = []
214 214 for el in self.launchers:
215 215 d = el.get_stop_deferred()
216 216 dlist.append(d)
217 217 el.interrupt_then_kill(delay)
218 218 dfinal = gatherBoth(dlist, consumeErrors=True)
219 219 dfinal.addCallback(self._handle_stop)
220 220 return dfinal
221 221
222 222
223 223 class BatchEngineSet(object):
224 224
225 225 # Subclasses must fill these in. See PBSEngineSet
226 226 submit_command = ''
227 227 delete_command = ''
228 228 job_id_regexp = ''
229 229
230 230 def __init__(self, template_file, **kwargs):
231 231 self.template_file = template_file
232 232 self.context = {}
233 233 self.context.update(kwargs)
234 self.batch_file = 'batch-script'
234 self.batch_file = self.template_file+'-run'
235 235
236 236 def parse_job_id(self, output):
237 237 m = re.match(self.job_id_regexp, output)
238 238 if m is not None:
239 239 job_id = m.group()
240 240 else:
241 241 raise Exception("job id couldn't be determined: %s" % output)
242 242 self.job_id = job_id
243 print 'Job started with job id:', job_id
243 log.msg('Job started with job id: %r' % job_id)
244 244 return job_id
245 245
246 246 def write_batch_script(self, n):
247 print 'n', n
248 247 self.context['n'] = n
249 248 template = open(self.template_file, 'r').read()
250 print 'template', template
251 log.msg(template)
249 log.msg('Using template for batch script: %s' % self.template_file)
252 250 log.msg(repr(self.context))
253 251 script_as_string = Itpl.itplns(template, self.context)
254 log.msg(script_as_string)
252 log.msg('Writing instantiated batch script: %s' % self.batch_file)
255 253 f = open(self.batch_file,'w')
256 254 f.write(script_as_string)
257 255 f.close()
258 256
259 257 def handle_error(self, f):
260 258 f.printTraceback()
261 #f.raiseException()
259 f.raiseException()
262 260
263 261 def start(self, n):
264 262 self.write_batch_script(n)
265 263 d = getProcessOutput(self.submit_command,
266 264 [self.batch_file],env=os.environ)
267 265 d.addCallback(self.parse_job_id)
268 266 d.addErrback(self.handle_error)
269 267 return d
270 268
271 269 def kill(self):
272 270 d = getProcessOutput(self.delete_command,
273 271 [self.job_id],env=os.environ)
274 272 return d
275 273
276 274 class PBSEngineSet(BatchEngineSet):
277 275
278 276 submit_command = 'qsub'
279 277 delete_command = 'qdel'
280 278 job_id_regexp = '\d+'
281 279
282 280 def __init__(self, template_file, **kwargs):
283 281 BatchEngineSet.__init__(self, template_file, **kwargs)
284 282
285 283
286 284 def main_local(args):
287 285 cont_args = []
288 286 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
289 287 if args.x:
290 288 cont_args.append('-x')
291 289 if args.y:
292 290 cont_args.append('-y')
293 291 cl = ControllerLauncher(extra_args=cont_args)
294 292 dstart = cl.start()
295 293 def start_engines(cont_pid):
296 294 engine_args = []
297 295 engine_args.append('--logfile=%s' % \
298 296 pjoin(args.logdir,'ipengine%s-' % cont_pid))
299 297 eset = LocalEngineSet(extra_args=engine_args)
300 298 def shutdown(signum, frame):
301 299 log.msg('Stopping local cluster')
302 300 # We are still playing with the times here, but these seem
303 301 # to be reliable in allowing everything to exit cleanly.
304 302 eset.interrupt_then_kill(0.5)
305 303 cl.interrupt_then_kill(0.5)
306 304 reactor.callLater(1.0, reactor.stop)
307 305 signal.signal(signal.SIGINT,shutdown)
308 306 d = eset.start(args.n)
309 307 return d
310 308 def delay_start(cont_pid):
311 309 # This is needed because the controller doesn't start listening
312 310 # right when it starts and the controller needs to write
313 311 # furl files for the engine to pick up
314 312 reactor.callLater(1.0, start_engines, cont_pid)
315 313 dstart.addCallback(delay_start)
316 314 dstart.addErrback(lambda f: f.raiseException())
317 315
318 316 def main_mpirun(args):
319 317 cont_args = []
320 318 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
321 319 if args.x:
322 320 cont_args.append('-x')
323 321 if args.y:
324 322 cont_args.append('-y')
325 323 cl = ControllerLauncher(extra_args=cont_args)
326 324 dstart = cl.start()
327 325 def start_engines(cont_pid):
328 326 raw_args = ['mpirun']
329 327 raw_args.extend(['-n',str(args.n)])
330 328 raw_args.append('ipengine')
331 329 raw_args.append('-l')
332 330 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
333 331 raw_args.append('--mpi=%s' % args.mpi)
334 332 eset = ProcessLauncher(raw_args)
335 333 def shutdown(signum, frame):
336 334 log.msg('Stopping local cluster')
337 335 # We are still playing with the times here, but these seem
338 336 # to be reliable in allowing everything to exit cleanly.
339 337 eset.interrupt_then_kill(1.0)
340 338 cl.interrupt_then_kill(1.0)
341 339 reactor.callLater(2.0, reactor.stop)
342 340 signal.signal(signal.SIGINT,shutdown)
343 341 d = eset.start()
344 342 return d
345 343 def delay_start(cont_pid):
346 344 # This is needed because the controller doesn't start listening
347 345 # right when it starts and the controller needs to write
348 346 # furl files for the engine to pick up
349 347 reactor.callLater(1.0, start_engines, cont_pid)
350 348 dstart.addCallback(delay_start)
351 349 dstart.addErrback(lambda f: f.raiseException())
352 350
353 351 def main_pbs(args):
354 352 cont_args = []
355 353 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
356 354 if args.x:
357 355 cont_args.append('-x')
358 356 if args.y:
359 357 cont_args.append('-y')
360 358 cl = ControllerLauncher(extra_args=cont_args)
361 359 dstart = cl.start()
362 360 def start_engines(r):
363 361 pbs_set = PBSEngineSet(args.pbsscript)
362 def shutdown(signum, frame):
363 log.msg('Stopping pbs cluster')
364 d = pbs_set.kill()
365 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
366 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
367 signal.signal(signal.SIGINT,shutdown)
364 368 d = pbs_set.start(args.n)
365 369 return d
366 370 dstart.addCallback(start_engines)
367 371 dstart.addErrback(lambda f: f.raiseException())
368 372
369 373
370 374 def get_args():
371 375 base_parser = argparse.ArgumentParser(add_help=False)
372 376 base_parser.add_argument(
373 377 '-x',
374 378 action='store_true',
375 379 dest='x',
376 380 help='turn off client security'
377 381 )
378 382 base_parser.add_argument(
379 383 '-y',
380 384 action='store_true',
381 385 dest='y',
382 386 help='turn off engine security'
383 387 )
384 388 base_parser.add_argument(
385 389 "--logdir",
386 390 type=str,
387 391 dest="logdir",
388 392 help="directory to put log files (default=$IPYTHONDIR/log)",
389 393 default=pjoin(get_ipython_dir(),'log')
390 394 )
391 395 base_parser.add_argument(
392 396 "-n",
393 397 "--num",
394 398 type=int,
395 399 dest="n",
396 400 default=2,
397 401 help="the number of engines to start"
398 402 )
399 403
400 404 parser = argparse.ArgumentParser(
401 405 description='IPython cluster startup. This starts a controller and\
402 406 engines using various approaches'
403 407 )
404 408 subparsers = parser.add_subparsers(
405 409 help='available cluster types. For help, do "ipcluster TYPE --help"')
406 410
407 411 parser_local = subparsers.add_parser(
408 412 'local',
409 413 help='run a local cluster',
410 414 parents=[base_parser]
411 415 )
412 416 parser_local.set_defaults(func=main_local)
413 417
414 418 parser_mpirun = subparsers.add_parser(
415 419 'mpirun',
416 420 help='run a cluster using mpirun',
417 421 parents=[base_parser]
418 422 )
419 423 parser_mpirun.add_argument(
420 424 "--mpi",
421 425 type=str,
422 426 dest="mpi",
423 427 default='mpi4py',
424 428 help="how to call MPI_Init (default=mpi4py)"
425 429 )
426 430 parser_mpirun.set_defaults(func=main_mpirun)
427 431
428 432 parser_pbs = subparsers.add_parser(
429 433 'pbs',
430 434 help='run a pbs cluster',
431 435 parents=[base_parser]
432 436 )
433 437 parser_pbs.add_argument(
434 438 '--pbs-script',
435 439 type=str,
436 440 dest='pbsscript',
437 441 help='PBS script template',
438 442 default='pbs.template'
439 443 )
440 444 parser_pbs.set_defaults(func=main_pbs)
441 445 args = parser.parse_args()
442 446 return args
443 447
444 448 def main():
445 449 args = get_args()
446 450 reactor.callWhenRunning(args.func, args)
447 451 log.startLogging(sys.stdout)
448 452 reactor.run()
449 453
450 454 if __name__ == '__main__':
451 455 main()
General Comments 0
You need to be logged in to leave comments. Login now