##// END OF EJS Templates
add delay configurable to EngineSetLaunchers...
MinRK -
Show More
@@ -1,1141 +1,1151
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import re
26 26 import stat
27 27
28 28 # signal imports, handling various platforms, versions
29 29
30 30 from signal import SIGINT, SIGTERM
31 31 try:
32 32 from signal import SIGKILL
33 33 except ImportError:
34 34 # Windows
35 35 SIGKILL=SIGTERM
36 36
37 37 try:
38 38 # Windows >= 2.7, 3.2
39 39 from signal import CTRL_C_EVENT as SIGINT
40 40 except ImportError:
41 41 pass
42 42
43 43 from subprocess import Popen, PIPE, STDOUT
44 44 try:
45 45 from subprocess import check_output
46 46 except ImportError:
47 47 # pre-2.7, define check_output with Popen
48 48 def check_output(*args, **kwargs):
49 49 kwargs.update(dict(stdout=PIPE))
50 50 p = Popen(*args, **kwargs)
51 51 out,err = p.communicate()
52 52 return out
53 53
54 54 from zmq.eventloop import ioloop
55 55
56 56 from IPython.config.application import Application
57 57 from IPython.config.configurable import LoggingConfigurable
58 58 from IPython.utils.text import EvalFormatter
59 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
59 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 60 from IPython.utils.path import get_ipython_module_path
61 61 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
62 62
63 63 from .win32support import forward_read_events
64 64
65 65 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
66 66
67 67 WINDOWS = os.name == 'nt'
68 68
69 69 #-----------------------------------------------------------------------------
70 70 # Paths to the kernel apps
71 71 #-----------------------------------------------------------------------------
72 72
73 73
74 74 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
75 75 'IPython.parallel.apps.ipclusterapp'
76 76 ))
77 77
78 78 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
79 79 'IPython.parallel.apps.ipengineapp'
80 80 ))
81 81
82 82 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
83 83 'IPython.parallel.apps.ipcontrollerapp'
84 84 ))
85 85
86 86 #-----------------------------------------------------------------------------
87 87 # Base launchers and errors
88 88 #-----------------------------------------------------------------------------
89 89
90 90
91 91 class LauncherError(Exception):
92 92 pass
93 93
94 94
95 95 class ProcessStateError(LauncherError):
96 96 pass
97 97
98 98
99 99 class UnknownStatus(LauncherError):
100 100 pass
101 101
102 102
103 103 class BaseLauncher(LoggingConfigurable):
104 104 """An asbtraction for starting, stopping and signaling a process."""
105 105
106 106 # In all of the launchers, the work_dir is where child processes will be
107 107 # run. This will usually be the profile_dir, but may not be. any work_dir
108 108 # passed into the __init__ method will override the config value.
109 109 # This should not be used to set the work_dir for the actual engine
110 110 # and controller. Instead, use their own config files or the
111 111 # controller_args, engine_args attributes of the launchers to add
112 112 # the work_dir option.
113 113 work_dir = Unicode(u'.')
114 114 loop = Instance('zmq.eventloop.ioloop.IOLoop')
115 115
116 116 start_data = Any()
117 117 stop_data = Any()
118 118
119 119 def _loop_default(self):
120 120 return ioloop.IOLoop.instance()
121 121
122 122 def __init__(self, work_dir=u'.', config=None, **kwargs):
123 123 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
124 124 self.state = 'before' # can be before, running, after
125 125 self.stop_callbacks = []
126 126 self.start_data = None
127 127 self.stop_data = None
128 128
129 129 @property
130 130 def args(self):
131 131 """A list of cmd and args that will be used to start the process.
132 132
133 133 This is what is passed to :func:`spawnProcess` and the first element
134 134 will be the process name.
135 135 """
136 136 return self.find_args()
137 137
138 138 def find_args(self):
139 139 """The ``.args`` property calls this to find the args list.
140 140
141 141 Subcommand should implement this to construct the cmd and args.
142 142 """
143 143 raise NotImplementedError('find_args must be implemented in a subclass')
144 144
145 145 @property
146 146 def arg_str(self):
147 147 """The string form of the program arguments."""
148 148 return ' '.join(self.args)
149 149
150 150 @property
151 151 def running(self):
152 152 """Am I running."""
153 153 if self.state == 'running':
154 154 return True
155 155 else:
156 156 return False
157 157
158 158 def start(self):
159 159 """Start the process."""
160 160 raise NotImplementedError('start must be implemented in a subclass')
161 161
162 162 def stop(self):
163 163 """Stop the process and notify observers of stopping.
164 164
165 165 This method will return None immediately.
166 166 To observe the actual process stopping, see :meth:`on_stop`.
167 167 """
168 168 raise NotImplementedError('stop must be implemented in a subclass')
169 169
170 170 def on_stop(self, f):
171 171 """Register a callback to be called with this Launcher's stop_data
172 172 when the process actually finishes.
173 173 """
174 174 if self.state=='after':
175 175 return f(self.stop_data)
176 176 else:
177 177 self.stop_callbacks.append(f)
178 178
179 179 def notify_start(self, data):
180 180 """Call this to trigger startup actions.
181 181
182 182 This logs the process startup and sets the state to 'running'. It is
183 183 a pass-through so it can be used as a callback.
184 184 """
185 185
186 186 self.log.info('Process %r started: %r' % (self.args[0], data))
187 187 self.start_data = data
188 188 self.state = 'running'
189 189 return data
190 190
191 191 def notify_stop(self, data):
192 192 """Call this to trigger process stop actions.
193 193
194 194 This logs the process stopping and sets the state to 'after'. Call
195 195 this to trigger callbacks registered via :meth:`on_stop`."""
196 196
197 197 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 198 self.stop_data = data
199 199 self.state = 'after'
200 200 for i in range(len(self.stop_callbacks)):
201 201 d = self.stop_callbacks.pop()
202 202 d(data)
203 203 return data
204 204
205 205 def signal(self, sig):
206 206 """Signal the process.
207 207
208 208 Parameters
209 209 ----------
210 210 sig : str or int
211 211 'KILL', 'INT', etc., or any signal number
212 212 """
213 213 raise NotImplementedError('signal must be implemented in a subclass')
214 214
215 215
216 216 #-----------------------------------------------------------------------------
217 217 # Local process launchers
218 218 #-----------------------------------------------------------------------------
219 219
220 220
221 221 class LocalProcessLauncher(BaseLauncher):
222 222 """Start and stop an external process in an asynchronous manner.
223 223
224 224 This will launch the external process with a working directory of
225 225 ``self.work_dir``.
226 226 """
227 227
228 228 # This is used to to construct self.args, which is passed to
229 229 # spawnProcess.
230 230 cmd_and_args = List([])
231 231 poll_frequency = Int(100) # in ms
232 232
233 233 def __init__(self, work_dir=u'.', config=None, **kwargs):
234 234 super(LocalProcessLauncher, self).__init__(
235 235 work_dir=work_dir, config=config, **kwargs
236 236 )
237 237 self.process = None
238 238 self.poller = None
239 239
240 240 def find_args(self):
241 241 return self.cmd_and_args
242 242
243 243 def start(self):
244 244 if self.state == 'before':
245 245 self.process = Popen(self.args,
246 246 stdout=PIPE,stderr=PIPE,stdin=PIPE,
247 247 env=os.environ,
248 248 cwd=self.work_dir
249 249 )
250 250 if WINDOWS:
251 251 self.stdout = forward_read_events(self.process.stdout)
252 252 self.stderr = forward_read_events(self.process.stderr)
253 253 else:
254 254 self.stdout = self.process.stdout.fileno()
255 255 self.stderr = self.process.stderr.fileno()
256 256 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
257 257 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
258 258 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
259 259 self.poller.start()
260 260 self.notify_start(self.process.pid)
261 261 else:
262 262 s = 'The process was already started and has state: %r' % self.state
263 263 raise ProcessStateError(s)
264 264
265 265 def stop(self):
266 266 return self.interrupt_then_kill()
267 267
268 268 def signal(self, sig):
269 269 if self.state == 'running':
270 270 if WINDOWS and sig != SIGINT:
271 271 # use Windows tree-kill for better child cleanup
272 272 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
273 273 else:
274 274 self.process.send_signal(sig)
275 275
276 276 def interrupt_then_kill(self, delay=2.0):
277 277 """Send INT, wait a delay and then send KILL."""
278 278 try:
279 279 self.signal(SIGINT)
280 280 except Exception:
281 281 self.log.debug("interrupt failed")
282 282 pass
283 283 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
284 284 self.killer.start()
285 285
286 286 # callbacks, etc:
287 287
288 288 def handle_stdout(self, fd, events):
289 289 if WINDOWS:
290 290 line = self.stdout.recv()
291 291 else:
292 292 line = self.process.stdout.readline()
293 293 # a stopped process will be readable but return empty strings
294 294 if line:
295 295 self.log.info(line[:-1])
296 296 else:
297 297 self.poll()
298 298
299 299 def handle_stderr(self, fd, events):
300 300 if WINDOWS:
301 301 line = self.stderr.recv()
302 302 else:
303 303 line = self.process.stderr.readline()
304 304 # a stopped process will be readable but return empty strings
305 305 if line:
306 306 self.log.error(line[:-1])
307 307 else:
308 308 self.poll()
309 309
310 310 def poll(self):
311 311 status = self.process.poll()
312 312 if status is not None:
313 313 self.poller.stop()
314 314 self.loop.remove_handler(self.stdout)
315 315 self.loop.remove_handler(self.stderr)
316 316 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
317 317 return status
318 318
319 319 class LocalControllerLauncher(LocalProcessLauncher):
320 320 """Launch a controller as a regular external process."""
321 321
322 322 controller_cmd = List(ipcontroller_cmd_argv, config=True,
323 323 help="""Popen command to launch ipcontroller.""")
324 324 # Command line arguments to ipcontroller.
325 325 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
326 326 help="""command-line args to pass to ipcontroller""")
327 327
328 328 def find_args(self):
329 329 return self.controller_cmd + self.controller_args
330 330
331 331 def start(self, profile_dir):
332 332 """Start the controller by profile_dir."""
333 333 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
334 334 self.profile_dir = unicode(profile_dir)
335 335 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
336 336 return super(LocalControllerLauncher, self).start()
337 337
338 338
339 339 class LocalEngineLauncher(LocalProcessLauncher):
340 340 """Launch a single engine as a regular externall process."""
341 341
342 342 engine_cmd = List(ipengine_cmd_argv, config=True,
343 343 help="""command to launch the Engine.""")
344 344 # Command line arguments for ipengine.
345 345 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
346 346 help="command-line arguments to pass to ipengine"
347 347 )
348 348
349 349 def find_args(self):
350 350 return self.engine_cmd + self.engine_args
351 351
352 352 def start(self, profile_dir):
353 353 """Start the engine by profile_dir."""
354 354 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
355 355 self.profile_dir = unicode(profile_dir)
356 356 return super(LocalEngineLauncher, self).start()
357 357
358 358
359 359 class LocalEngineSetLauncher(BaseLauncher):
360 360 """Launch a set of engines as regular external processes."""
361 361
362 362 # Command line arguments for ipengine.
363 363 engine_args = List(
364 364 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
365 365 help="command-line arguments to pass to ipengine"
366 366 )
367 delay = CFloat(0.1, config=True,
368 help="""delay (in seconds) between starting each engine after the first.
369 This can help force the engines to get their ids in order, or limit
370 process flood when starting many engines."""
371 )
372
367 373 # launcher class
368 374 launcher_class = LocalEngineLauncher
369 375
370 376 launchers = Dict()
371 377 stop_data = Dict()
372 378
373 379 def __init__(self, work_dir=u'.', config=None, **kwargs):
374 380 super(LocalEngineSetLauncher, self).__init__(
375 381 work_dir=work_dir, config=config, **kwargs
376 382 )
377 383 self.stop_data = {}
378 384
379 385 def start(self, n, profile_dir):
380 386 """Start n engines by profile or profile_dir."""
381 387 self.profile_dir = unicode(profile_dir)
382 388 dlist = []
383 389 for i in range(n):
390 if i > 0:
391 time.sleep(self.delay)
384 392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
385 393 # Copy the engine args over to each engine launcher.
386 394 el.engine_args = copy.deepcopy(self.engine_args)
387 395 el.on_stop(self._notice_engine_stopped)
388 396 d = el.start(profile_dir)
389 397 if i==0:
390 398 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
391 399 self.launchers[i] = el
392 400 dlist.append(d)
393 401 self.notify_start(dlist)
394 402 # The consumeErrors here could be dangerous
395 403 # dfinal = gatherBoth(dlist, consumeErrors=True)
396 404 # dfinal.addCallback(self.notify_start)
397 405 return dlist
398 406
399 407 def find_args(self):
400 408 return ['engine set']
401 409
402 410 def signal(self, sig):
403 411 dlist = []
404 412 for el in self.launchers.itervalues():
405 413 d = el.signal(sig)
406 414 dlist.append(d)
407 415 # dfinal = gatherBoth(dlist, consumeErrors=True)
408 416 return dlist
409 417
410 418 def interrupt_then_kill(self, delay=1.0):
411 419 dlist = []
412 420 for el in self.launchers.itervalues():
413 421 d = el.interrupt_then_kill(delay)
414 422 dlist.append(d)
415 423 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 424 return dlist
417 425
418 426 def stop(self):
419 427 return self.interrupt_then_kill()
420 428
421 429 def _notice_engine_stopped(self, data):
422 430 pid = data['pid']
423 431 for idx,el in self.launchers.iteritems():
424 432 if el.process.pid == pid:
425 433 break
426 434 self.launchers.pop(idx)
427 435 self.stop_data[idx] = data
428 436 if not self.launchers:
429 437 self.notify_stop(self.stop_data)
430 438
431 439
432 440 #-----------------------------------------------------------------------------
433 441 # MPIExec launchers
434 442 #-----------------------------------------------------------------------------
435 443
436 444
437 445 class MPIExecLauncher(LocalProcessLauncher):
438 446 """Launch an external process using mpiexec."""
439 447
440 448 mpi_cmd = List(['mpiexec'], config=True,
441 449 help="The mpiexec command to use in starting the process."
442 450 )
443 451 mpi_args = List([], config=True,
444 452 help="The command line arguments to pass to mpiexec."
445 453 )
446 454 program = List(['date'], config=True,
447 455 help="The program to start via mpiexec.")
448 456 program_args = List([], config=True,
449 457 help="The command line argument to the program."
450 458 )
451 459 n = Int(1)
452 460
453 461 def find_args(self):
454 462 """Build self.args using all the fields."""
455 463 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
456 464 self.program + self.program_args
457 465
458 466 def start(self, n):
459 467 """Start n instances of the program using mpiexec."""
460 468 self.n = n
461 469 return super(MPIExecLauncher, self).start()
462 470
463 471
464 472 class MPIExecControllerLauncher(MPIExecLauncher):
465 473 """Launch a controller using mpiexec."""
466 474
467 475 controller_cmd = List(ipcontroller_cmd_argv, config=True,
468 476 help="Popen command to launch the Contropper"
469 477 )
470 478 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
471 479 help="Command line arguments to pass to ipcontroller."
472 480 )
473 481 n = Int(1)
474 482
475 483 def start(self, profile_dir):
476 484 """Start the controller by profile_dir."""
477 485 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
478 486 self.profile_dir = unicode(profile_dir)
479 487 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
480 488 return super(MPIExecControllerLauncher, self).start(1)
481 489
482 490 def find_args(self):
483 491 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
484 492 self.controller_cmd + self.controller_args
485 493
486 494
487 495 class MPIExecEngineSetLauncher(MPIExecLauncher):
488 496
489 497 program = List(ipengine_cmd_argv, config=True,
490 498 help="Popen command for ipengine"
491 499 )
492 500 program_args = List(
493 501 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
494 502 help="Command line arguments for ipengine."
495 503 )
496 504 n = Int(1)
497 505
498 506 def start(self, n, profile_dir):
499 507 """Start n engines by profile or profile_dir."""
500 508 self.program_args.extend(['--profile-dir=%s'%profile_dir])
501 509 self.profile_dir = unicode(profile_dir)
502 510 self.n = n
503 511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
504 512 return super(MPIExecEngineSetLauncher, self).start(n)
505 513
506 514 #-----------------------------------------------------------------------------
507 515 # SSH launchers
508 516 #-----------------------------------------------------------------------------
509 517
510 518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
511 519
512 520 class SSHLauncher(LocalProcessLauncher):
513 521 """A minimal launcher for ssh.
514 522
515 523 To be useful this will probably have to be extended to use the ``sshx``
516 524 idea for environment variables. There could be other things this needs
517 525 as well.
518 526 """
519 527
520 528 ssh_cmd = List(['ssh'], config=True,
521 529 help="command for starting ssh")
522 530 ssh_args = List(['-tt'], config=True,
523 531 help="args to pass to ssh")
524 532 program = List(['date'], config=True,
525 533 help="Program to launch via ssh")
526 534 program_args = List([], config=True,
527 535 help="args to pass to remote program")
528 536 hostname = Unicode('', config=True,
529 537 help="hostname on which to launch the program")
530 538 user = Unicode('', config=True,
531 539 help="username for ssh")
532 540 location = Unicode('', config=True,
533 541 help="user@hostname location for ssh in one setting")
534 542
535 543 def _hostname_changed(self, name, old, new):
536 544 if self.user:
537 545 self.location = u'%s@%s' % (self.user, new)
538 546 else:
539 547 self.location = new
540 548
541 549 def _user_changed(self, name, old, new):
542 550 self.location = u'%s@%s' % (new, self.hostname)
543 551
544 552 def find_args(self):
545 553 return self.ssh_cmd + self.ssh_args + [self.location] + \
546 554 self.program + self.program_args
547 555
548 556 def start(self, profile_dir, hostname=None, user=None):
549 557 self.profile_dir = unicode(profile_dir)
550 558 if hostname is not None:
551 559 self.hostname = hostname
552 560 if user is not None:
553 561 self.user = user
554 562
555 563 return super(SSHLauncher, self).start()
556 564
557 565 def signal(self, sig):
558 566 if self.state == 'running':
559 567 # send escaped ssh connection-closer
560 568 self.process.stdin.write('~.')
561 569 self.process.stdin.flush()
562 570
563 571
564 572
565 573 class SSHControllerLauncher(SSHLauncher):
566 574
567 575 program = List(ipcontroller_cmd_argv, config=True,
568 576 help="remote ipcontroller command.")
569 577 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
570 578 help="Command line arguments to ipcontroller.")
571 579
572 580
573 581 class SSHEngineLauncher(SSHLauncher):
574 582 program = List(ipengine_cmd_argv, config=True,
575 583 help="remote ipengine command.")
576 584 # Command line arguments for ipengine.
577 585 program_args = List(
578 586 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
579 587 help="Command line arguments to ipengine."
580 588 )
581 589
582 590 class SSHEngineSetLauncher(LocalEngineSetLauncher):
583 591 launcher_class = SSHEngineLauncher
584 592 engines = Dict(config=True,
585 593 help="""dict of engines to launch. This is a dict by hostname of ints,
586 594 corresponding to the number of engines to start on that host.""")
587 595
588 596 def start(self, n, profile_dir):
589 597 """Start engines by profile or profile_dir.
590 598 `n` is ignored, and the `engines` config property is used instead.
591 599 """
592 600
593 601 self.profile_dir = unicode(profile_dir)
594 602 dlist = []
595 603 for host, n in self.engines.iteritems():
596 604 if isinstance(n, (tuple, list)):
597 605 n, args = n
598 606 else:
599 607 args = copy.deepcopy(self.engine_args)
600 608
601 609 if '@' in host:
602 610 user,host = host.split('@',1)
603 611 else:
604 612 user=None
605 613 for i in range(n):
614 if i > 0:
615 time.sleep(self.delay)
606 616 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
607 617
608 618 # Copy the engine args over to each engine launcher.
609 619 i
610 620 el.program_args = args
611 621 el.on_stop(self._notice_engine_stopped)
612 622 d = el.start(profile_dir, user=user, hostname=host)
613 623 if i==0:
614 624 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
615 625 self.launchers[host+str(i)] = el
616 626 dlist.append(d)
617 627 self.notify_start(dlist)
618 628 return dlist
619 629
620 630
621 631
622 632 #-----------------------------------------------------------------------------
623 633 # Windows HPC Server 2008 scheduler launchers
624 634 #-----------------------------------------------------------------------------
625 635
626 636
627 637 # This is only used on Windows.
628 638 def find_job_cmd():
629 639 if WINDOWS:
630 640 try:
631 641 return find_cmd('job')
632 642 except (FindCmdError, ImportError):
633 643 # ImportError will be raised if win32api is not installed
634 644 return 'job'
635 645 else:
636 646 return 'job'
637 647
638 648
639 649 class WindowsHPCLauncher(BaseLauncher):
640 650
641 651 job_id_regexp = Unicode(r'\d+', config=True,
642 652 help="""A regular expression used to get the job id from the output of the
643 653 submit_command. """
644 654 )
645 655 job_file_name = Unicode(u'ipython_job.xml', config=True,
646 656 help="The filename of the instantiated job script.")
647 657 # The full path to the instantiated job script. This gets made dynamically
648 658 # by combining the work_dir with the job_file_name.
649 659 job_file = Unicode(u'')
650 660 scheduler = Unicode('', config=True,
651 661 help="The hostname of the scheduler to submit the job to.")
652 662 job_cmd = Unicode(find_job_cmd(), config=True,
653 663 help="The command for submitting jobs.")
654 664
655 665 def __init__(self, work_dir=u'.', config=None, **kwargs):
656 666 super(WindowsHPCLauncher, self).__init__(
657 667 work_dir=work_dir, config=config, **kwargs
658 668 )
659 669
660 670 @property
661 671 def job_file(self):
662 672 return os.path.join(self.work_dir, self.job_file_name)
663 673
664 674 def write_job_file(self, n):
665 675 raise NotImplementedError("Implement write_job_file in a subclass.")
666 676
667 677 def find_args(self):
668 678 return [u'job.exe']
669 679
670 680 def parse_job_id(self, output):
671 681 """Take the output of the submit command and return the job id."""
672 682 m = re.search(self.job_id_regexp, output)
673 683 if m is not None:
674 684 job_id = m.group()
675 685 else:
676 686 raise LauncherError("Job id couldn't be determined: %s" % output)
677 687 self.job_id = job_id
678 688 self.log.info('Job started with job id: %r' % job_id)
679 689 return job_id
680 690
681 691 def start(self, n):
682 692 """Start n copies of the process using the Win HPC job scheduler."""
683 693 self.write_job_file(n)
684 694 args = [
685 695 'submit',
686 696 '/jobfile:%s' % self.job_file,
687 697 '/scheduler:%s' % self.scheduler
688 698 ]
689 699 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
690 700
691 701 output = check_output([self.job_cmd]+args,
692 702 env=os.environ,
693 703 cwd=self.work_dir,
694 704 stderr=STDOUT
695 705 )
696 706 job_id = self.parse_job_id(output)
697 707 self.notify_start(job_id)
698 708 return job_id
699 709
700 710 def stop(self):
701 711 args = [
702 712 'cancel',
703 713 self.job_id,
704 714 '/scheduler:%s' % self.scheduler
705 715 ]
706 716 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
707 717 try:
708 718 output = check_output([self.job_cmd]+args,
709 719 env=os.environ,
710 720 cwd=self.work_dir,
711 721 stderr=STDOUT
712 722 )
713 723 except:
714 724 output = 'The job already appears to be stoppped: %r' % self.job_id
715 725 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
716 726 return output
717 727
718 728
719 729 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
720 730
721 731 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
722 732 help="WinHPC xml job file.")
723 733 extra_args = List([], config=False,
724 734 help="extra args to pass to ipcontroller")
725 735
726 736 def write_job_file(self, n):
727 737 job = IPControllerJob(config=self.config)
728 738
729 739 t = IPControllerTask(config=self.config)
730 740 # The tasks work directory is *not* the actual work directory of
731 741 # the controller. It is used as the base path for the stdout/stderr
732 742 # files that the scheduler redirects to.
733 743 t.work_directory = self.profile_dir
734 744 # Add the profile_dir and from self.start().
735 745 t.controller_args.extend(self.extra_args)
736 746 job.add_task(t)
737 747
738 748 self.log.info("Writing job description file: %s" % self.job_file)
739 749 job.write(self.job_file)
740 750
741 751 @property
742 752 def job_file(self):
743 753 return os.path.join(self.profile_dir, self.job_file_name)
744 754
745 755 def start(self, profile_dir):
746 756 """Start the controller by profile_dir."""
747 757 self.extra_args = ['--profile-dir=%s'%profile_dir]
748 758 self.profile_dir = unicode(profile_dir)
749 759 return super(WindowsHPCControllerLauncher, self).start(1)
750 760
751 761
752 762 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
753 763
754 764 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
755 765 help="jobfile for ipengines job")
756 766 extra_args = List([], config=False,
757 767 help="extra args to pas to ipengine")
758 768
759 769 def write_job_file(self, n):
760 770 job = IPEngineSetJob(config=self.config)
761 771
762 772 for i in range(n):
763 773 t = IPEngineTask(config=self.config)
764 774 # The tasks work directory is *not* the actual work directory of
765 775 # the engine. It is used as the base path for the stdout/stderr
766 776 # files that the scheduler redirects to.
767 777 t.work_directory = self.profile_dir
768 778 # Add the profile_dir and from self.start().
769 779 t.engine_args.extend(self.extra_args)
770 780 job.add_task(t)
771 781
772 782 self.log.info("Writing job description file: %s" % self.job_file)
773 783 job.write(self.job_file)
774 784
775 785 @property
776 786 def job_file(self):
777 787 return os.path.join(self.profile_dir, self.job_file_name)
778 788
779 789 def start(self, n, profile_dir):
780 790 """Start the controller by profile_dir."""
781 791 self.extra_args = ['--profile-dir=%s'%profile_dir]
782 792 self.profile_dir = unicode(profile_dir)
783 793 return super(WindowsHPCEngineSetLauncher, self).start(n)
784 794
785 795
786 796 #-----------------------------------------------------------------------------
787 797 # Batch (PBS) system launchers
788 798 #-----------------------------------------------------------------------------
789 799
790 800 class BatchSystemLauncher(BaseLauncher):
791 801 """Launch an external process using a batch system.
792 802
793 803 This class is designed to work with UNIX batch systems like PBS, LSF,
794 804 GridEngine, etc. The overall model is that there are different commands
795 805 like qsub, qdel, etc. that handle the starting and stopping of the process.
796 806
797 807 This class also has the notion of a batch script. The ``batch_template``
798 808 attribute can be set to a string that is a template for the batch script.
799 809 This template is instantiated using string formatting. Thus the template can
800 810 use {n} fot the number of instances. Subclasses can add additional variables
801 811 to the template dict.
802 812 """
803 813
804 814 # Subclasses must fill these in. See PBSEngineSet
805 815 submit_command = List([''], config=True,
806 816 help="The name of the command line program used to submit jobs.")
807 817 delete_command = List([''], config=True,
808 818 help="The name of the command line program used to delete jobs.")
809 819 job_id_regexp = Unicode('', config=True,
810 820 help="""A regular expression used to get the job id from the output of the
811 821 submit_command.""")
812 822 batch_template = Unicode('', config=True,
813 823 help="The string that is the batch script template itself.")
814 824 batch_template_file = Unicode(u'', config=True,
815 825 help="The file that contains the batch template.")
816 826 batch_file_name = Unicode(u'batch_script', config=True,
817 827 help="The filename of the instantiated batch script.")
818 828 queue = Unicode(u'', config=True,
819 829 help="The PBS Queue.")
820 830
821 831 # not configurable, override in subclasses
822 832 # PBS Job Array regex
823 833 job_array_regexp = Unicode('')
824 834 job_array_template = Unicode('')
825 835 # PBS Queue regex
826 836 queue_regexp = Unicode('')
827 837 queue_template = Unicode('')
828 838 # The default batch template, override in subclasses
829 839 default_template = Unicode('')
830 840 # The full path to the instantiated batch script.
831 841 batch_file = Unicode(u'')
832 842 # the format dict used with batch_template:
833 843 context = Dict()
834 844 # the Formatter instance for rendering the templates:
835 845 formatter = Instance(EvalFormatter, (), {})
836 846
837 847
838 848 def find_args(self):
839 849 return self.submit_command + [self.batch_file]
840 850
841 851 def __init__(self, work_dir=u'.', config=None, **kwargs):
842 852 super(BatchSystemLauncher, self).__init__(
843 853 work_dir=work_dir, config=config, **kwargs
844 854 )
845 855 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
846 856
847 857 def parse_job_id(self, output):
848 858 """Take the output of the submit command and return the job id."""
849 859 m = re.search(self.job_id_regexp, output)
850 860 if m is not None:
851 861 job_id = m.group()
852 862 else:
853 863 raise LauncherError("Job id couldn't be determined: %s" % output)
854 864 self.job_id = job_id
855 865 self.log.info('Job submitted with job id: %r' % job_id)
856 866 return job_id
857 867
858 868 def write_batch_script(self, n):
859 869 """Instantiate and write the batch script to the work_dir."""
860 870 self.context['n'] = n
861 871 self.context['queue'] = self.queue
862 872 # first priority is batch_template if set
863 873 if self.batch_template_file and not self.batch_template:
864 874 # second priority is batch_template_file
865 875 with open(self.batch_template_file) as f:
866 876 self.batch_template = f.read()
867 877 if not self.batch_template:
868 878 # third (last) priority is default_template
869 879 self.batch_template = self.default_template
870 880
871 881 # add jobarray or queue lines to user-specified template
872 882 # note that this is *only* when user did not specify a template.
873 883 regex = re.compile(self.job_array_regexp)
874 884 # print regex.search(self.batch_template)
875 885 if not regex.search(self.batch_template):
876 886 self.log.info("adding job array settings to batch script")
877 887 firstline, rest = self.batch_template.split('\n',1)
878 888 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
879 889
880 890 regex = re.compile(self.queue_regexp)
881 891 # print regex.search(self.batch_template)
882 892 if self.queue and not regex.search(self.batch_template):
883 893 self.log.info("adding PBS queue settings to batch script")
884 894 firstline, rest = self.batch_template.split('\n',1)
885 895 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
886 896
887 897 script_as_string = self.formatter.format(self.batch_template, **self.context)
888 898 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
889 899
890 900 with open(self.batch_file, 'w') as f:
891 901 f.write(script_as_string)
892 902 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
893 903
894 904 def start(self, n, profile_dir):
895 905 """Start n copies of the process using a batch system."""
896 906 # Here we save profile_dir in the context so they
897 907 # can be used in the batch script template as {profile_dir}
898 908 self.context['profile_dir'] = profile_dir
899 909 self.profile_dir = unicode(profile_dir)
900 910 self.write_batch_script(n)
901 911 output = check_output(self.args, env=os.environ)
902 912
903 913 job_id = self.parse_job_id(output)
904 914 self.notify_start(job_id)
905 915 return job_id
906 916
907 917 def stop(self):
908 918 output = check_output(self.delete_command+[self.job_id], env=os.environ)
909 919 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
910 920 return output
911 921
912 922
913 923 class PBSLauncher(BatchSystemLauncher):
914 924 """A BatchSystemLauncher subclass for PBS."""
915 925
916 926 submit_command = List(['qsub'], config=True,
917 927 help="The PBS submit command ['qsub']")
918 928 delete_command = List(['qdel'], config=True,
919 929 help="The PBS delete command ['qsub']")
920 930 job_id_regexp = Unicode(r'\d+', config=True,
921 931 help="Regular expresion for identifying the job ID [r'\d+']")
922 932
923 933 batch_file = Unicode(u'')
924 934 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
925 935 job_array_template = Unicode('#PBS -t 1-{n}')
926 936 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
927 937 queue_template = Unicode('#PBS -q {queue}')
928 938
929 939
930 940 class PBSControllerLauncher(PBSLauncher):
931 941 """Launch a controller using PBS."""
932 942
933 943 batch_file_name = Unicode(u'pbs_controller', config=True,
934 944 help="batch file name for the controller job.")
935 945 default_template= Unicode("""#!/bin/sh
936 946 #PBS -V
937 947 #PBS -N ipcontroller
938 948 %s --log-to-file --profile-dir={profile_dir}
939 949 """%(' '.join(ipcontroller_cmd_argv)))
940 950
941 951 def start(self, profile_dir):
942 952 """Start the controller by profile or profile_dir."""
943 953 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
944 954 return super(PBSControllerLauncher, self).start(1, profile_dir)
945 955
946 956
947 957 class PBSEngineSetLauncher(PBSLauncher):
948 958 """Launch Engines using PBS"""
949 959 batch_file_name = Unicode(u'pbs_engines', config=True,
950 960 help="batch file name for the engine(s) job.")
951 961 default_template= Unicode(u"""#!/bin/sh
952 962 #PBS -V
953 963 #PBS -N ipengine
954 964 %s --profile-dir={profile_dir}
955 965 """%(' '.join(ipengine_cmd_argv)))
956 966
957 967 def start(self, n, profile_dir):
958 968 """Start n engines by profile or profile_dir."""
959 969 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
960 970 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
961 971
962 972 #SGE is very similar to PBS
963 973
964 974 class SGELauncher(PBSLauncher):
965 975 """Sun GridEngine is a PBS clone with slightly different syntax"""
966 976 job_array_regexp = Unicode('#\$\W+\-t')
967 977 job_array_template = Unicode('#$ -t 1-{n}')
968 978 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
969 979 queue_template = Unicode('#$ -q {queue}')
970 980
971 981 class SGEControllerLauncher(SGELauncher):
972 982 """Launch a controller using SGE."""
973 983
974 984 batch_file_name = Unicode(u'sge_controller', config=True,
975 985 help="batch file name for the ipontroller job.")
976 986 default_template= Unicode(u"""#$ -V
977 987 #$ -S /bin/sh
978 988 #$ -N ipcontroller
979 989 %s --log-to-file --profile-dir={profile_dir}
980 990 """%(' '.join(ipcontroller_cmd_argv)))
981 991
982 992 def start(self, profile_dir):
983 993 """Start the controller by profile or profile_dir."""
984 994 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
985 995 return super(SGEControllerLauncher, self).start(1, profile_dir)
986 996
987 997 class SGEEngineSetLauncher(SGELauncher):
988 998 """Launch Engines with SGE"""
989 999 batch_file_name = Unicode(u'sge_engines', config=True,
990 1000 help="batch file name for the engine(s) job.")
991 1001 default_template = Unicode("""#$ -V
992 1002 #$ -S /bin/sh
993 1003 #$ -N ipengine
994 1004 %s --profile-dir={profile_dir}
995 1005 """%(' '.join(ipengine_cmd_argv)))
996 1006
997 1007 def start(self, n, profile_dir):
998 1008 """Start n engines by profile or profile_dir."""
999 1009 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1000 1010 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1001 1011
1002 1012
1003 1013 # LSF launchers
1004 1014
1005 1015 class LSFLauncher(BatchSystemLauncher):
1006 1016 """A BatchSystemLauncher subclass for LSF."""
1007 1017
1008 1018 submit_command = List(['bsub'], config=True,
1009 1019 help="The PBS submit command ['bsub']")
1010 1020 delete_command = List(['bkill'], config=True,
1011 1021 help="The PBS delete command ['bkill']")
1012 1022 job_id_regexp = Unicode(r'\d+', config=True,
1013 1023 help="Regular expresion for identifying the job ID [r'\d+']")
1014 1024
1015 1025 batch_file = Unicode(u'')
1016 1026 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1017 1027 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1018 1028 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1019 1029 queue_template = Unicode('#BSUB -q {queue}')
1020 1030
1021 1031 def start(self, n, profile_dir):
1022 1032 """Start n copies of the process using LSF batch system.
1023 1033 This cant inherit from the base class because bsub expects
1024 1034 to be piped a shell script in order to honor the #BSUB directives :
1025 1035 bsub < script
1026 1036 """
1027 1037 # Here we save profile_dir in the context so they
1028 1038 # can be used in the batch script template as {profile_dir}
1029 1039 self.context['profile_dir'] = profile_dir
1030 1040 self.profile_dir = unicode(profile_dir)
1031 1041 self.write_batch_script(n)
1032 1042 #output = check_output(self.args, env=os.environ)
1033 1043 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1034 1044 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1035 1045 output,err = p.communicate()
1036 1046 job_id = self.parse_job_id(output)
1037 1047 self.notify_start(job_id)
1038 1048 return job_id
1039 1049
1040 1050
1041 1051 class LSFControllerLauncher(LSFLauncher):
1042 1052 """Launch a controller using LSF."""
1043 1053
1044 1054 batch_file_name = Unicode(u'lsf_controller', config=True,
1045 1055 help="batch file name for the controller job.")
1046 1056 default_template= Unicode("""#!/bin/sh
1047 1057 #BSUB -J ipcontroller
1048 1058 #BSUB -oo ipcontroller.o.%%J
1049 1059 #BSUB -eo ipcontroller.e.%%J
1050 1060 %s --log-to-file --profile-dir={profile_dir}
1051 1061 """%(' '.join(ipcontroller_cmd_argv)))
1052 1062
1053 1063 def start(self, profile_dir):
1054 1064 """Start the controller by profile or profile_dir."""
1055 1065 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1056 1066 return super(LSFControllerLauncher, self).start(1, profile_dir)
1057 1067
1058 1068
1059 1069 class LSFEngineSetLauncher(LSFLauncher):
1060 1070 """Launch Engines using LSF"""
1061 1071 batch_file_name = Unicode(u'lsf_engines', config=True,
1062 1072 help="batch file name for the engine(s) job.")
1063 1073 default_template= Unicode(u"""#!/bin/sh
1064 1074 #BSUB -oo ipengine.o.%%J
1065 1075 #BSUB -eo ipengine.e.%%J
1066 1076 %s --profile-dir={profile_dir}
1067 1077 """%(' '.join(ipengine_cmd_argv)))
1068 1078
1069 1079 def start(self, n, profile_dir):
1070 1080 """Start n engines by profile or profile_dir."""
1071 1081 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1072 1082 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1073 1083
1074 1084
1075 1085 #-----------------------------------------------------------------------------
1076 1086 # A launcher for ipcluster itself!
1077 1087 #-----------------------------------------------------------------------------
1078 1088
1079 1089
1080 1090 class IPClusterLauncher(LocalProcessLauncher):
1081 1091 """Launch the ipcluster program in an external process."""
1082 1092
1083 1093 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1084 1094 help="Popen command for ipcluster")
1085 1095 ipcluster_args = List(
1086 1096 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1087 1097 help="Command line arguments to pass to ipcluster.")
1088 1098 ipcluster_subcommand = Unicode('start')
1089 1099 ipcluster_n = Int(2)
1090 1100
1091 1101 def find_args(self):
1092 1102 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1093 1103 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1094 1104
1095 1105 def start(self):
1096 1106 self.log.info("Starting ipcluster: %r" % self.args)
1097 1107 return super(IPClusterLauncher, self).start()
1098 1108
1099 1109 #-----------------------------------------------------------------------------
1100 1110 # Collections of launchers
1101 1111 #-----------------------------------------------------------------------------
1102 1112
1103 1113 local_launchers = [
1104 1114 LocalControllerLauncher,
1105 1115 LocalEngineLauncher,
1106 1116 LocalEngineSetLauncher,
1107 1117 ]
1108 1118 mpi_launchers = [
1109 1119 MPIExecLauncher,
1110 1120 MPIExecControllerLauncher,
1111 1121 MPIExecEngineSetLauncher,
1112 1122 ]
1113 1123 ssh_launchers = [
1114 1124 SSHLauncher,
1115 1125 SSHControllerLauncher,
1116 1126 SSHEngineLauncher,
1117 1127 SSHEngineSetLauncher,
1118 1128 ]
1119 1129 winhpc_launchers = [
1120 1130 WindowsHPCLauncher,
1121 1131 WindowsHPCControllerLauncher,
1122 1132 WindowsHPCEngineSetLauncher,
1123 1133 ]
1124 1134 pbs_launchers = [
1125 1135 PBSLauncher,
1126 1136 PBSControllerLauncher,
1127 1137 PBSEngineSetLauncher,
1128 1138 ]
1129 1139 sge_launchers = [
1130 1140 SGELauncher,
1131 1141 SGEControllerLauncher,
1132 1142 SGEEngineSetLauncher,
1133 1143 ]
1134 1144 lsf_launchers = [
1135 1145 LSFLauncher,
1136 1146 LSFControllerLauncher,
1137 1147 LSFEngineSetLauncher,
1138 1148 ]
1139 1149 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1140 1150 + pbs_launchers + sge_launchers + lsf_launchers
1141 1151
General Comments 0
You need to be logged in to leave comments. Login now