##// END OF EJS Templates
Initial version that works with PBS.
Brian Granger -
Show More
@@ -1,435 +1,451 b''
1 1 import os
2 2 import re
3 3 import sys
4 4 import signal
5 5 pjoin = os.path.join
6 6
7 7 from twisted.internet import reactor, defer
8 8 from twisted.internet.protocol import ProcessProtocol
9 9 from twisted.python import failure, log
10 10 from twisted.internet.error import ProcessDone, ProcessTerminated
11 11 from twisted.internet.utils import getProcessOutput
12 12
13 13 from IPython.external import argparse
14 14 from IPython.external import Itpl
15 15 from IPython.kernel.twistedutil import gatherBoth
16 16 from IPython.kernel.util import printer
17 17 from IPython.genutils import get_ipython_dir, num_cpus
18 18
19 19 def find_exe(cmd):
20 20 try:
21 21 import win32api
22 22 except ImportError:
23 23 raise ImportError('you need to have pywin32 installed for this to work')
24 24 else:
25 25 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd)
26 26 return path
27 27
28 28 # Test local cluster on Win32
29 29 # Look at local cluster usage strings
30 30 # PBS stuff
31 31
32 32 class ProcessStateError(Exception):
33 33 pass
34 34
35 35 class UnknownStatus(Exception):
36 36 pass
37 37
38 38 class LauncherProcessProtocol(ProcessProtocol):
39 39 """
40 40 A ProcessProtocol to go with the ProcessLauncher.
41 41 """
42 42 def __init__(self, process_launcher):
43 43 self.process_launcher = process_launcher
44 44
45 45 def connectionMade(self):
46 46 self.process_launcher.fire_start_deferred(self.transport.pid)
47 47
48 48 def processEnded(self, status):
49 49 value = status.value
50 50 if isinstance(value, ProcessDone):
51 51 self.process_launcher.fire_stop_deferred(0)
52 52 elif isinstance(value, ProcessTerminated):
53 53 self.process_launcher.fire_stop_deferred(
54 54 {'exit_code':value.exitCode,
55 55 'signal':value.signal,
56 56 'status':value.status
57 57 }
58 58 )
59 59 else:
60 60 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
61 61
62 62 def outReceived(self, data):
63 63 log.msg(data)
64 64
65 65 def errReceived(self, data):
66 66 log.err(data)
67 67
68 68 class ProcessLauncher(object):
69 69 """
70 70 Start and stop an external process in an asynchronous manner.
71 71
72 72 Currently this uses deferreds to notify other parties of process state
73 73 changes. This is an awkward design and should be moved to using
74 74 a formal NotificationCenter.
75 75 """
76 76 def __init__(self, cmd_and_args):
77 77 self.cmd = cmd_and_args[0]
78 78 self.args = cmd_and_args
79 79 self._reset()
80 80
81 81 def _reset(self):
82 82 self.process_protocol = None
83 83 self.pid = None
84 84 self.start_deferred = None
85 85 self.stop_deferreds = []
86 86 self.state = 'before' # before, running, or after
87 87
88 88 @property
89 89 def running(self):
90 90 if self.state == 'running':
91 91 return True
92 92 else:
93 93 return False
94 94
95 95 def fire_start_deferred(self, pid):
96 96 self.pid = pid
97 97 self.state = 'running'
98 98 log.msg('Process %r has started with pid=%i' % (self.args, pid))
99 99 self.start_deferred.callback(pid)
100 100
101 101 def start(self):
102 102 if self.state == 'before':
103 103 self.process_protocol = LauncherProcessProtocol(self)
104 104 self.start_deferred = defer.Deferred()
105 105 self.process_transport = reactor.spawnProcess(
106 106 self.process_protocol,
107 107 self.cmd,
108 108 self.args,
109 109 env=os.environ
110 110 )
111 111 return self.start_deferred
112 112 else:
113 113 s = 'the process has already been started and has state: %r' % \
114 114 self.state
115 115 return defer.fail(ProcessStateError(s))
116 116
117 117 def get_stop_deferred(self):
118 118 if self.state == 'running' or self.state == 'before':
119 119 d = defer.Deferred()
120 120 self.stop_deferreds.append(d)
121 121 return d
122 122 else:
123 123 s = 'this process is already complete'
124 124 return defer.fail(ProcessStateError(s))
125 125
126 126 def fire_stop_deferred(self, exit_code):
127 127 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
128 128 self.state = 'after'
129 129 for d in self.stop_deferreds:
130 130 d.callback(exit_code)
131 131
132 132 def signal(self, sig):
133 133 """
134 134 Send a signal to the process.
135 135
136 136 The argument sig can be ('KILL','INT', etc.) or any signal number.
137 137 """
138 138 if self.state == 'running':
139 139 self.process_transport.signalProcess(sig)
140 140
141 141 # def __del__(self):
142 142 # self.signal('KILL')
143 143
144 144 def interrupt_then_kill(self, delay=1.0):
145 145 self.signal('INT')
146 146 reactor.callLater(delay, self.signal, 'KILL')
147 147
148 148
149 149 class ControllerLauncher(ProcessLauncher):
150 150
151 151 def __init__(self, extra_args=None):
152 152 if sys.platform == 'win32':
153 153 args = [find_exe('ipcontroller.bat')]
154 154 else:
155 155 args = ['ipcontroller']
156 156 self.extra_args = extra_args
157 157 if extra_args is not None:
158 158 args.extend(extra_args)
159 159
160 160 ProcessLauncher.__init__(self, args)
161 161
162 162
163 163 class EngineLauncher(ProcessLauncher):
164 164
165 165 def __init__(self, extra_args=None):
166 166 if sys.platform == 'win32':
167 167 args = [find_exe('ipengine.bat')]
168 168 else:
169 169 args = ['ipengine']
170 170 self.extra_args = extra_args
171 171 if extra_args is not None:
172 172 args.extend(extra_args)
173 173
174 174 ProcessLauncher.__init__(self, args)
175 175
176 176
177 177 class LocalEngineSet(object):
178 178
179 179 def __init__(self, extra_args=None):
180 180 self.extra_args = extra_args
181 181 self.launchers = []
182 182
183 183 def start(self, n):
184 184 dlist = []
185 185 for i in range(n):
186 186 el = EngineLauncher(extra_args=self.extra_args)
187 187 d = el.start()
188 188 self.launchers.append(el)
189 189 dlist.append(d)
190 190 dfinal = gatherBoth(dlist, consumeErrors=True)
191 191 dfinal.addCallback(self._handle_start)
192 192 return dfinal
193 193
194 194 def _handle_start(self, r):
195 195 log.msg('Engines started with pids: %r' % r)
196 196 return r
197 197
198 198 def _handle_stop(self, r):
199 199 log.msg('Engines received signal: %r' % r)
200 200 return r
201 201
202 202 def signal(self, sig):
203 203 dlist = []
204 204 for el in self.launchers:
205 205 d = el.get_stop_deferred()
206 206 dlist.append(d)
207 207 el.signal(sig)
208 208 dfinal = gatherBoth(dlist, consumeErrors=True)
209 209 dfinal.addCallback(self._handle_stop)
210 210 return dfinal
211 211
212 212 def interrupt_then_kill(self, delay=1.0):
213 213 dlist = []
214 214 for el in self.launchers:
215 215 d = el.get_stop_deferred()
216 216 dlist.append(d)
217 217 el.interrupt_then_kill(delay)
218 218 dfinal = gatherBoth(dlist, consumeErrors=True)
219 219 dfinal.addCallback(self._handle_stop)
220 220 return dfinal
221 221
222 222
223 223 class BatchEngineSet(object):
224 224
225 225 # Subclasses must fill these in. See PBSEngineSet
226 226 submit_command = ''
227 227 delete_command = ''
228 228 job_id_regexp = ''
229 229
230 230 def __init__(self, template_file, **kwargs):
231 231 self.template_file = template_file
232 232 self.context = {}
233 233 self.context.update(kwargs)
234 234 self.batch_file = 'batch-script'
235 235
236 236 def parse_job_id(self, output):
237 237 m = re.match(self.job_id_regexp, output)
238 238 if m is not None:
239 239 job_id = m.group()
240 240 else:
241 241 raise Exception("job id couldn't be determined: %s" % output)
242 242 self.job_id = job_id
243 243 print 'Job started with job id:', job_id
244 244 return job_id
245 245
246 246 def write_batch_script(self, n):
247 247 print 'n', n
248 248 self.context['n'] = n
249 249 template = open(self.template_file, 'r').read()
250 250 print 'template', template
251 log.msg(template)
252 log.msg(repr(self.context))
251 253 script_as_string = Itpl.itplns(template, self.context)
252 print 'script', script_as_string
254 log.msg(script_as_string)
253 255 f = open(self.batch_file,'w')
254 256 f.write(script_as_string)
255 257 f.close()
256 258
257 259 def handle_error(self, f):
258 260 f.printTraceback()
259 f.raiseException()
261 #f.raiseException()
260 262
261 263 def start(self, n):
262 264 self.write_batch_script(n)
263 265 d = getProcessOutput(self.submit_command,
264 266 [self.batch_file],env=os.environ)
265 267 d.addCallback(self.parse_job_id)
266 #d.addErrback(self.handle_error)
268 d.addErrback(self.handle_error)
267 269 return d
268 270
269 271 def kill(self):
270 272 d = getProcessOutput(self.delete_command,
271 273 [self.job_id],env=os.environ)
272 274 return d
273 275
274 276 class PBSEngineSet(BatchEngineSet):
275 277
276 278 submit_command = 'qsub'
277 279 delete_command = 'qdel'
278 280 job_id_regexp = '\d+'
279 281
280 282 def __init__(self, template_file, **kwargs):
281 283 BatchEngineSet.__init__(self, template_file, **kwargs)
282 284
283 285
284 286 def main_local(args):
285 287 cont_args = []
286 288 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
287 289 if args.x:
288 290 cont_args.append('-x')
289 291 if args.y:
290 292 cont_args.append('-y')
291 293 cl = ControllerLauncher(extra_args=cont_args)
292 294 dstart = cl.start()
293 295 def start_engines(cont_pid):
294 296 engine_args = []
295 297 engine_args.append('--logfile=%s' % \
296 298 pjoin(args.logdir,'ipengine%s-' % cont_pid))
297 299 eset = LocalEngineSet(extra_args=engine_args)
298 300 def shutdown(signum, frame):
299 301 log.msg('Stopping local cluster')
300 302 # We are still playing with the times here, but these seem
301 303 # to be reliable in allowing everything to exit cleanly.
302 304 eset.interrupt_then_kill(0.5)
303 305 cl.interrupt_then_kill(0.5)
304 306 reactor.callLater(1.0, reactor.stop)
305 307 signal.signal(signal.SIGINT,shutdown)
306 308 d = eset.start(args.n)
307 309 return d
308 310 def delay_start(cont_pid):
309 311 # This is needed because the controller doesn't start listening
310 312 # right when it starts and the controller needs to write
311 313 # furl files for the engine to pick up
312 314 reactor.callLater(1.0, start_engines, cont_pid)
313 315 dstart.addCallback(delay_start)
314 316 dstart.addErrback(lambda f: f.raiseException())
315 317
316 318 def main_mpirun(args):
317 319 cont_args = []
318 320 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
319 321 if args.x:
320 322 cont_args.append('-x')
321 323 if args.y:
322 324 cont_args.append('-y')
323 325 cl = ControllerLauncher(extra_args=cont_args)
324 326 dstart = cl.start()
325 327 def start_engines(cont_pid):
326 328 raw_args = ['mpirun']
327 329 raw_args.extend(['-n',str(args.n)])
328 330 raw_args.append('ipengine')
329 331 raw_args.append('-l')
330 332 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
331 333 raw_args.append('--mpi=%s' % args.mpi)
332 334 eset = ProcessLauncher(raw_args)
333 335 def shutdown(signum, frame):
334 336 log.msg('Stopping local cluster')
335 337 # We are still playing with the times here, but these seem
336 338 # to be reliable in allowing everything to exit cleanly.
337 339 eset.interrupt_then_kill(1.0)
338 340 cl.interrupt_then_kill(1.0)
339 341 reactor.callLater(2.0, reactor.stop)
340 342 signal.signal(signal.SIGINT,shutdown)
341 343 d = eset.start()
342 344 return d
343 345 def delay_start(cont_pid):
344 346 # This is needed because the controller doesn't start listening
345 347 # right when it starts and the controller needs to write
346 348 # furl files for the engine to pick up
347 349 reactor.callLater(1.0, start_engines, cont_pid)
348 350 dstart.addCallback(delay_start)
349 351 dstart.addErrback(lambda f: f.raiseException())
350 352
351 353 def main_pbs(args):
352 cl = ControllerLauncher()
354 cont_args = []
355 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
356 if args.x:
357 cont_args.append('-x')
358 if args.y:
359 cont_args.append('-y')
360 cl = ControllerLauncher(extra_args=cont_args)
353 361 dstart = cl.start()
354 362 def start_engines(r):
355 pbs_set = PBSEngineSet('pbs.template')
356 print pbs_set.template_file
363 pbs_set = PBSEngineSet(args.pbsscript)
357 364 d = pbs_set.start(args.n)
358 365 return d
359 366 dstart.addCallback(start_engines)
360 dstart.addErrback(lambda f: f.printTraceback())
367 dstart.addErrback(lambda f: f.raiseException())
361 368
362 369
363 370 def get_args():
364 371 base_parser = argparse.ArgumentParser(add_help=False)
365 372 base_parser.add_argument(
366 373 '-x',
367 374 action='store_true',
368 375 dest='x',
369 376 help='turn off client security'
370 377 )
371 378 base_parser.add_argument(
372 379 '-y',
373 380 action='store_true',
374 381 dest='y',
375 382 help='turn off engine security'
376 383 )
377 384 base_parser.add_argument(
378 385 "--logdir",
379 386 type=str,
380 387 dest="logdir",
381 388 help="directory to put log files (default=$IPYTHONDIR/log)",
382 389 default=pjoin(get_ipython_dir(),'log')
383 390 )
384 391 base_parser.add_argument(
385 392 "-n",
386 393 "--num",
387 394 type=int,
388 395 dest="n",
389 396 default=2,
390 397 help="the number of engines to start"
391 398 )
392 399
393 400 parser = argparse.ArgumentParser(
394 401 description='IPython cluster startup. This starts a controller and\
395 402 engines using various approaches'
396 403 )
397 404 subparsers = parser.add_subparsers(
398 405 help='available cluster types. For help, do "ipcluster TYPE --help"')
399 406
400 407 parser_local = subparsers.add_parser(
401 408 'local',
402 409 help='run a local cluster',
403 410 parents=[base_parser]
404 411 )
405 412 parser_local.set_defaults(func=main_local)
406 413
407 414 parser_mpirun = subparsers.add_parser(
408 415 'mpirun',
409 416 help='run a cluster using mpirun',
410 417 parents=[base_parser]
411 418 )
412 419 parser_mpirun.add_argument(
413 420 "--mpi",
414 421 type=str,
415 422 dest="mpi",
416 423 default='mpi4py',
417 424 help="how to call MPI_Init (default=mpi4py)"
418 425 )
419 426 parser_mpirun.set_defaults(func=main_mpirun)
420 427
421 parser_pbs = subparsers.add_parser('pbs', help='run a pbs cluster')
422 parser_pbs.add_argument('--pbs-script', type=str, dest='pbsscript',
423 help='PBS script template')
428 parser_pbs = subparsers.add_parser(
429 'pbs',
430 help='run a pbs cluster',
431 parents=[base_parser]
432 )
433 parser_pbs.add_argument(
434 '--pbs-script',
435 type=str,
436 dest='pbsscript',
437 help='PBS script template',
438 default='pbs.template'
439 )
424 440 parser_pbs.set_defaults(func=main_pbs)
425 441 args = parser.parse_args()
426 442 return args
427 443
428 444 def main():
429 445 args = get_args()
430 446 reactor.callWhenRunning(args.func, args)
431 447 log.startLogging(sys.stdout)
432 448 reactor.run()
433 449
434 450 if __name__ == '__main__':
435 451 main()
General Comments 0
You need to be logged in to leave comments. Login now