##// END OF EJS Templates
Added import time to IPython/parallel/apps/launcher.py...
Ben Edwards -
Show More
@@ -1,1151 +1,1152 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import re
26 26 import stat
27 import time
27 28
28 29 # signal imports, handling various platforms, versions
29 30
30 31 from signal import SIGINT, SIGTERM
31 32 try:
32 33 from signal import SIGKILL
33 34 except ImportError:
34 35 # Windows
35 36 SIGKILL=SIGTERM
36 37
37 38 try:
38 39 # Windows >= 2.7, 3.2
39 40 from signal import CTRL_C_EVENT as SIGINT
40 41 except ImportError:
41 42 pass
42 43
43 44 from subprocess import Popen, PIPE, STDOUT
44 45 try:
45 46 from subprocess import check_output
46 47 except ImportError:
47 48 # pre-2.7, define check_output with Popen
48 49 def check_output(*args, **kwargs):
49 50 kwargs.update(dict(stdout=PIPE))
50 51 p = Popen(*args, **kwargs)
51 52 out,err = p.communicate()
52 53 return out
53 54
54 55 from zmq.eventloop import ioloop
55 56
56 57 from IPython.config.application import Application
57 58 from IPython.config.configurable import LoggingConfigurable
58 59 from IPython.utils.text import EvalFormatter
59 60 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 61 from IPython.utils.path import get_ipython_module_path
61 62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
62 63
63 64 from .win32support import forward_read_events
64 65
65 66 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
66 67
67 68 WINDOWS = os.name == 'nt'
68 69
69 70 #-----------------------------------------------------------------------------
70 71 # Paths to the kernel apps
71 72 #-----------------------------------------------------------------------------
72 73
73 74
74 75 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
75 76 'IPython.parallel.apps.ipclusterapp'
76 77 ))
77 78
78 79 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
79 80 'IPython.parallel.apps.ipengineapp'
80 81 ))
81 82
82 83 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
83 84 'IPython.parallel.apps.ipcontrollerapp'
84 85 ))
85 86
86 87 #-----------------------------------------------------------------------------
87 88 # Base launchers and errors
88 89 #-----------------------------------------------------------------------------
89 90
90 91
91 92 class LauncherError(Exception):
92 93 pass
93 94
94 95
95 96 class ProcessStateError(LauncherError):
96 97 pass
97 98
98 99
99 100 class UnknownStatus(LauncherError):
100 101 pass
101 102
102 103
103 104 class BaseLauncher(LoggingConfigurable):
104 105 """An asbtraction for starting, stopping and signaling a process."""
105 106
106 107 # In all of the launchers, the work_dir is where child processes will be
107 108 # run. This will usually be the profile_dir, but may not be. any work_dir
108 109 # passed into the __init__ method will override the config value.
109 110 # This should not be used to set the work_dir for the actual engine
110 111 # and controller. Instead, use their own config files or the
111 112 # controller_args, engine_args attributes of the launchers to add
112 113 # the work_dir option.
113 114 work_dir = Unicode(u'.')
114 115 loop = Instance('zmq.eventloop.ioloop.IOLoop')
115 116
116 117 start_data = Any()
117 118 stop_data = Any()
118 119
119 120 def _loop_default(self):
120 121 return ioloop.IOLoop.instance()
121 122
122 123 def __init__(self, work_dir=u'.', config=None, **kwargs):
123 124 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
124 125 self.state = 'before' # can be before, running, after
125 126 self.stop_callbacks = []
126 127 self.start_data = None
127 128 self.stop_data = None
128 129
129 130 @property
130 131 def args(self):
131 132 """A list of cmd and args that will be used to start the process.
132 133
133 134 This is what is passed to :func:`spawnProcess` and the first element
134 135 will be the process name.
135 136 """
136 137 return self.find_args()
137 138
138 139 def find_args(self):
139 140 """The ``.args`` property calls this to find the args list.
140 141
141 142 Subcommand should implement this to construct the cmd and args.
142 143 """
143 144 raise NotImplementedError('find_args must be implemented in a subclass')
144 145
145 146 @property
146 147 def arg_str(self):
147 148 """The string form of the program arguments."""
148 149 return ' '.join(self.args)
149 150
150 151 @property
151 152 def running(self):
152 153 """Am I running."""
153 154 if self.state == 'running':
154 155 return True
155 156 else:
156 157 return False
157 158
158 159 def start(self):
159 160 """Start the process."""
160 161 raise NotImplementedError('start must be implemented in a subclass')
161 162
162 163 def stop(self):
163 164 """Stop the process and notify observers of stopping.
164 165
165 166 This method will return None immediately.
166 167 To observe the actual process stopping, see :meth:`on_stop`.
167 168 """
168 169 raise NotImplementedError('stop must be implemented in a subclass')
169 170
170 171 def on_stop(self, f):
171 172 """Register a callback to be called with this Launcher's stop_data
172 173 when the process actually finishes.
173 174 """
174 175 if self.state=='after':
175 176 return f(self.stop_data)
176 177 else:
177 178 self.stop_callbacks.append(f)
178 179
179 180 def notify_start(self, data):
180 181 """Call this to trigger startup actions.
181 182
182 183 This logs the process startup and sets the state to 'running'. It is
183 184 a pass-through so it can be used as a callback.
184 185 """
185 186
186 187 self.log.info('Process %r started: %r' % (self.args[0], data))
187 188 self.start_data = data
188 189 self.state = 'running'
189 190 return data
190 191
191 192 def notify_stop(self, data):
192 193 """Call this to trigger process stop actions.
193 194
194 195 This logs the process stopping and sets the state to 'after'. Call
195 196 this to trigger callbacks registered via :meth:`on_stop`."""
196 197
197 198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 199 self.stop_data = data
199 200 self.state = 'after'
200 201 for i in range(len(self.stop_callbacks)):
201 202 d = self.stop_callbacks.pop()
202 203 d(data)
203 204 return data
204 205
205 206 def signal(self, sig):
206 207 """Signal the process.
207 208
208 209 Parameters
209 210 ----------
210 211 sig : str or int
211 212 'KILL', 'INT', etc., or any signal number
212 213 """
213 214 raise NotImplementedError('signal must be implemented in a subclass')
214 215
215 216
216 217 #-----------------------------------------------------------------------------
217 218 # Local process launchers
218 219 #-----------------------------------------------------------------------------
219 220
220 221
221 222 class LocalProcessLauncher(BaseLauncher):
222 223 """Start and stop an external process in an asynchronous manner.
223 224
224 225 This will launch the external process with a working directory of
225 226 ``self.work_dir``.
226 227 """
227 228
228 229 # This is used to to construct self.args, which is passed to
229 230 # spawnProcess.
230 231 cmd_and_args = List([])
231 232 poll_frequency = Int(100) # in ms
232 233
233 234 def __init__(self, work_dir=u'.', config=None, **kwargs):
234 235 super(LocalProcessLauncher, self).__init__(
235 236 work_dir=work_dir, config=config, **kwargs
236 237 )
237 238 self.process = None
238 239 self.poller = None
239 240
240 241 def find_args(self):
241 242 return self.cmd_and_args
242 243
243 244 def start(self):
244 245 if self.state == 'before':
245 246 self.process = Popen(self.args,
246 247 stdout=PIPE,stderr=PIPE,stdin=PIPE,
247 248 env=os.environ,
248 249 cwd=self.work_dir
249 250 )
250 251 if WINDOWS:
251 252 self.stdout = forward_read_events(self.process.stdout)
252 253 self.stderr = forward_read_events(self.process.stderr)
253 254 else:
254 255 self.stdout = self.process.stdout.fileno()
255 256 self.stderr = self.process.stderr.fileno()
256 257 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
257 258 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
258 259 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
259 260 self.poller.start()
260 261 self.notify_start(self.process.pid)
261 262 else:
262 263 s = 'The process was already started and has state: %r' % self.state
263 264 raise ProcessStateError(s)
264 265
265 266 def stop(self):
266 267 return self.interrupt_then_kill()
267 268
268 269 def signal(self, sig):
269 270 if self.state == 'running':
270 271 if WINDOWS and sig != SIGINT:
271 272 # use Windows tree-kill for better child cleanup
272 273 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
273 274 else:
274 275 self.process.send_signal(sig)
275 276
276 277 def interrupt_then_kill(self, delay=2.0):
277 278 """Send INT, wait a delay and then send KILL."""
278 279 try:
279 280 self.signal(SIGINT)
280 281 except Exception:
281 282 self.log.debug("interrupt failed")
282 283 pass
283 284 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
284 285 self.killer.start()
285 286
286 287 # callbacks, etc:
287 288
288 289 def handle_stdout(self, fd, events):
289 290 if WINDOWS:
290 291 line = self.stdout.recv()
291 292 else:
292 293 line = self.process.stdout.readline()
293 294 # a stopped process will be readable but return empty strings
294 295 if line:
295 296 self.log.info(line[:-1])
296 297 else:
297 298 self.poll()
298 299
299 300 def handle_stderr(self, fd, events):
300 301 if WINDOWS:
301 302 line = self.stderr.recv()
302 303 else:
303 304 line = self.process.stderr.readline()
304 305 # a stopped process will be readable but return empty strings
305 306 if line:
306 307 self.log.error(line[:-1])
307 308 else:
308 309 self.poll()
309 310
310 311 def poll(self):
311 312 status = self.process.poll()
312 313 if status is not None:
313 314 self.poller.stop()
314 315 self.loop.remove_handler(self.stdout)
315 316 self.loop.remove_handler(self.stderr)
316 317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
317 318 return status
318 319
319 320 class LocalControllerLauncher(LocalProcessLauncher):
320 321 """Launch a controller as a regular external process."""
321 322
322 323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
323 324 help="""Popen command to launch ipcontroller.""")
324 325 # Command line arguments to ipcontroller.
325 326 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
326 327 help="""command-line args to pass to ipcontroller""")
327 328
328 329 def find_args(self):
329 330 return self.controller_cmd + self.controller_args
330 331
331 332 def start(self, profile_dir):
332 333 """Start the controller by profile_dir."""
333 334 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
334 335 self.profile_dir = unicode(profile_dir)
335 336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
336 337 return super(LocalControllerLauncher, self).start()
337 338
338 339
339 340 class LocalEngineLauncher(LocalProcessLauncher):
340 341 """Launch a single engine as a regular externall process."""
341 342
342 343 engine_cmd = List(ipengine_cmd_argv, config=True,
343 344 help="""command to launch the Engine.""")
344 345 # Command line arguments for ipengine.
345 346 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
346 347 help="command-line arguments to pass to ipengine"
347 348 )
348 349
349 350 def find_args(self):
350 351 return self.engine_cmd + self.engine_args
351 352
352 353 def start(self, profile_dir):
353 354 """Start the engine by profile_dir."""
354 355 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
355 356 self.profile_dir = unicode(profile_dir)
356 357 return super(LocalEngineLauncher, self).start()
357 358
358 359
359 360 class LocalEngineSetLauncher(BaseLauncher):
360 361 """Launch a set of engines as regular external processes."""
361 362
362 363 # Command line arguments for ipengine.
363 364 engine_args = List(
364 365 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
365 366 help="command-line arguments to pass to ipengine"
366 367 )
367 368 delay = CFloat(0.1, config=True,
368 369 help="""delay (in seconds) between starting each engine after the first.
369 370 This can help force the engines to get their ids in order, or limit
370 371 process flood when starting many engines."""
371 372 )
372 373
373 374 # launcher class
374 375 launcher_class = LocalEngineLauncher
375 376
376 377 launchers = Dict()
377 378 stop_data = Dict()
378 379
379 380 def __init__(self, work_dir=u'.', config=None, **kwargs):
380 381 super(LocalEngineSetLauncher, self).__init__(
381 382 work_dir=work_dir, config=config, **kwargs
382 383 )
383 384 self.stop_data = {}
384 385
385 386 def start(self, n, profile_dir):
386 387 """Start n engines by profile or profile_dir."""
387 388 self.profile_dir = unicode(profile_dir)
388 389 dlist = []
389 390 for i in range(n):
390 391 if i > 0:
391 392 time.sleep(self.delay)
392 393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
393 394 # Copy the engine args over to each engine launcher.
394 395 el.engine_args = copy.deepcopy(self.engine_args)
395 396 el.on_stop(self._notice_engine_stopped)
396 397 d = el.start(profile_dir)
397 398 if i==0:
398 399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
399 400 self.launchers[i] = el
400 401 dlist.append(d)
401 402 self.notify_start(dlist)
402 403 # The consumeErrors here could be dangerous
403 404 # dfinal = gatherBoth(dlist, consumeErrors=True)
404 405 # dfinal.addCallback(self.notify_start)
405 406 return dlist
406 407
407 408 def find_args(self):
408 409 return ['engine set']
409 410
410 411 def signal(self, sig):
411 412 dlist = []
412 413 for el in self.launchers.itervalues():
413 414 d = el.signal(sig)
414 415 dlist.append(d)
415 416 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 417 return dlist
417 418
418 419 def interrupt_then_kill(self, delay=1.0):
419 420 dlist = []
420 421 for el in self.launchers.itervalues():
421 422 d = el.interrupt_then_kill(delay)
422 423 dlist.append(d)
423 424 # dfinal = gatherBoth(dlist, consumeErrors=True)
424 425 return dlist
425 426
426 427 def stop(self):
427 428 return self.interrupt_then_kill()
428 429
429 430 def _notice_engine_stopped(self, data):
430 431 pid = data['pid']
431 432 for idx,el in self.launchers.iteritems():
432 433 if el.process.pid == pid:
433 434 break
434 435 self.launchers.pop(idx)
435 436 self.stop_data[idx] = data
436 437 if not self.launchers:
437 438 self.notify_stop(self.stop_data)
438 439
439 440
440 441 #-----------------------------------------------------------------------------
441 442 # MPIExec launchers
442 443 #-----------------------------------------------------------------------------
443 444
444 445
445 446 class MPIExecLauncher(LocalProcessLauncher):
446 447 """Launch an external process using mpiexec."""
447 448
448 449 mpi_cmd = List(['mpiexec'], config=True,
449 450 help="The mpiexec command to use in starting the process."
450 451 )
451 452 mpi_args = List([], config=True,
452 453 help="The command line arguments to pass to mpiexec."
453 454 )
454 455 program = List(['date'], config=True,
455 456 help="The program to start via mpiexec.")
456 457 program_args = List([], config=True,
457 458 help="The command line argument to the program."
458 459 )
459 460 n = Int(1)
460 461
461 462 def find_args(self):
462 463 """Build self.args using all the fields."""
463 464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
464 465 self.program + self.program_args
465 466
466 467 def start(self, n):
467 468 """Start n instances of the program using mpiexec."""
468 469 self.n = n
469 470 return super(MPIExecLauncher, self).start()
470 471
471 472
472 473 class MPIExecControllerLauncher(MPIExecLauncher):
473 474 """Launch a controller using mpiexec."""
474 475
475 476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
476 477 help="Popen command to launch the Contropper"
477 478 )
478 479 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
479 480 help="Command line arguments to pass to ipcontroller."
480 481 )
481 482 n = Int(1)
482 483
483 484 def start(self, profile_dir):
484 485 """Start the controller by profile_dir."""
485 486 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
486 487 self.profile_dir = unicode(profile_dir)
487 488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
488 489 return super(MPIExecControllerLauncher, self).start(1)
489 490
490 491 def find_args(self):
491 492 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
492 493 self.controller_cmd + self.controller_args
493 494
494 495
495 496 class MPIExecEngineSetLauncher(MPIExecLauncher):
496 497
497 498 program = List(ipengine_cmd_argv, config=True,
498 499 help="Popen command for ipengine"
499 500 )
500 501 program_args = List(
501 502 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
502 503 help="Command line arguments for ipengine."
503 504 )
504 505 n = Int(1)
505 506
506 507 def start(self, n, profile_dir):
507 508 """Start n engines by profile or profile_dir."""
508 509 self.program_args.extend(['--profile-dir=%s'%profile_dir])
509 510 self.profile_dir = unicode(profile_dir)
510 511 self.n = n
511 512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 513 return super(MPIExecEngineSetLauncher, self).start(n)
513 514
514 515 #-----------------------------------------------------------------------------
515 516 # SSH launchers
516 517 #-----------------------------------------------------------------------------
517 518
518 519 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
519 520
520 521 class SSHLauncher(LocalProcessLauncher):
521 522 """A minimal launcher for ssh.
522 523
523 524 To be useful this will probably have to be extended to use the ``sshx``
524 525 idea for environment variables. There could be other things this needs
525 526 as well.
526 527 """
527 528
528 529 ssh_cmd = List(['ssh'], config=True,
529 530 help="command for starting ssh")
530 531 ssh_args = List(['-tt'], config=True,
531 532 help="args to pass to ssh")
532 533 program = List(['date'], config=True,
533 534 help="Program to launch via ssh")
534 535 program_args = List([], config=True,
535 536 help="args to pass to remote program")
536 537 hostname = Unicode('', config=True,
537 538 help="hostname on which to launch the program")
538 539 user = Unicode('', config=True,
539 540 help="username for ssh")
540 541 location = Unicode('', config=True,
541 542 help="user@hostname location for ssh in one setting")
542 543
543 544 def _hostname_changed(self, name, old, new):
544 545 if self.user:
545 546 self.location = u'%s@%s' % (self.user, new)
546 547 else:
547 548 self.location = new
548 549
549 550 def _user_changed(self, name, old, new):
550 551 self.location = u'%s@%s' % (new, self.hostname)
551 552
552 553 def find_args(self):
553 554 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 555 self.program + self.program_args
555 556
556 557 def start(self, profile_dir, hostname=None, user=None):
557 558 self.profile_dir = unicode(profile_dir)
558 559 if hostname is not None:
559 560 self.hostname = hostname
560 561 if user is not None:
561 562 self.user = user
562 563
563 564 return super(SSHLauncher, self).start()
564 565
565 566 def signal(self, sig):
566 567 if self.state == 'running':
567 568 # send escaped ssh connection-closer
568 569 self.process.stdin.write('~.')
569 570 self.process.stdin.flush()
570 571
571 572
572 573
573 574 class SSHControllerLauncher(SSHLauncher):
574 575
575 576 program = List(ipcontroller_cmd_argv, config=True,
576 577 help="remote ipcontroller command.")
577 578 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
578 579 help="Command line arguments to ipcontroller.")
579 580
580 581
581 582 class SSHEngineLauncher(SSHLauncher):
582 583 program = List(ipengine_cmd_argv, config=True,
583 584 help="remote ipengine command.")
584 585 # Command line arguments for ipengine.
585 586 program_args = List(
586 587 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
587 588 help="Command line arguments to ipengine."
588 589 )
589 590
590 591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
591 592 launcher_class = SSHEngineLauncher
592 593 engines = Dict(config=True,
593 594 help="""dict of engines to launch. This is a dict by hostname of ints,
594 595 corresponding to the number of engines to start on that host.""")
595 596
596 597 def start(self, n, profile_dir):
597 598 """Start engines by profile or profile_dir.
598 599 `n` is ignored, and the `engines` config property is used instead.
599 600 """
600 601
601 602 self.profile_dir = unicode(profile_dir)
602 603 dlist = []
603 604 for host, n in self.engines.iteritems():
604 605 if isinstance(n, (tuple, list)):
605 606 n, args = n
606 607 else:
607 608 args = copy.deepcopy(self.engine_args)
608 609
609 610 if '@' in host:
610 611 user,host = host.split('@',1)
611 612 else:
612 613 user=None
613 614 for i in range(n):
614 615 if i > 0:
615 616 time.sleep(self.delay)
616 617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
617 618
618 619 # Copy the engine args over to each engine launcher.
619 620 i
620 621 el.program_args = args
621 622 el.on_stop(self._notice_engine_stopped)
622 623 d = el.start(profile_dir, user=user, hostname=host)
623 624 if i==0:
624 625 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
625 626 self.launchers[host+str(i)] = el
626 627 dlist.append(d)
627 628 self.notify_start(dlist)
628 629 return dlist
629 630
630 631
631 632
632 633 #-----------------------------------------------------------------------------
633 634 # Windows HPC Server 2008 scheduler launchers
634 635 #-----------------------------------------------------------------------------
635 636
636 637
637 638 # This is only used on Windows.
638 639 def find_job_cmd():
639 640 if WINDOWS:
640 641 try:
641 642 return find_cmd('job')
642 643 except (FindCmdError, ImportError):
643 644 # ImportError will be raised if win32api is not installed
644 645 return 'job'
645 646 else:
646 647 return 'job'
647 648
648 649
649 650 class WindowsHPCLauncher(BaseLauncher):
650 651
651 652 job_id_regexp = Unicode(r'\d+', config=True,
652 653 help="""A regular expression used to get the job id from the output of the
653 654 submit_command. """
654 655 )
655 656 job_file_name = Unicode(u'ipython_job.xml', config=True,
656 657 help="The filename of the instantiated job script.")
657 658 # The full path to the instantiated job script. This gets made dynamically
658 659 # by combining the work_dir with the job_file_name.
659 660 job_file = Unicode(u'')
660 661 scheduler = Unicode('', config=True,
661 662 help="The hostname of the scheduler to submit the job to.")
662 663 job_cmd = Unicode(find_job_cmd(), config=True,
663 664 help="The command for submitting jobs.")
664 665
665 666 def __init__(self, work_dir=u'.', config=None, **kwargs):
666 667 super(WindowsHPCLauncher, self).__init__(
667 668 work_dir=work_dir, config=config, **kwargs
668 669 )
669 670
670 671 @property
671 672 def job_file(self):
672 673 return os.path.join(self.work_dir, self.job_file_name)
673 674
674 675 def write_job_file(self, n):
675 676 raise NotImplementedError("Implement write_job_file in a subclass.")
676 677
677 678 def find_args(self):
678 679 return [u'job.exe']
679 680
680 681 def parse_job_id(self, output):
681 682 """Take the output of the submit command and return the job id."""
682 683 m = re.search(self.job_id_regexp, output)
683 684 if m is not None:
684 685 job_id = m.group()
685 686 else:
686 687 raise LauncherError("Job id couldn't be determined: %s" % output)
687 688 self.job_id = job_id
688 689 self.log.info('Job started with job id: %r' % job_id)
689 690 return job_id
690 691
691 692 def start(self, n):
692 693 """Start n copies of the process using the Win HPC job scheduler."""
693 694 self.write_job_file(n)
694 695 args = [
695 696 'submit',
696 697 '/jobfile:%s' % self.job_file,
697 698 '/scheduler:%s' % self.scheduler
698 699 ]
699 700 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
700 701
701 702 output = check_output([self.job_cmd]+args,
702 703 env=os.environ,
703 704 cwd=self.work_dir,
704 705 stderr=STDOUT
705 706 )
706 707 job_id = self.parse_job_id(output)
707 708 self.notify_start(job_id)
708 709 return job_id
709 710
710 711 def stop(self):
711 712 args = [
712 713 'cancel',
713 714 self.job_id,
714 715 '/scheduler:%s' % self.scheduler
715 716 ]
716 717 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
717 718 try:
718 719 output = check_output([self.job_cmd]+args,
719 720 env=os.environ,
720 721 cwd=self.work_dir,
721 722 stderr=STDOUT
722 723 )
723 724 except:
724 725 output = 'The job already appears to be stoppped: %r' % self.job_id
725 726 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
726 727 return output
727 728
728 729
729 730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
730 731
731 732 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
732 733 help="WinHPC xml job file.")
733 734 extra_args = List([], config=False,
734 735 help="extra args to pass to ipcontroller")
735 736
736 737 def write_job_file(self, n):
737 738 job = IPControllerJob(config=self.config)
738 739
739 740 t = IPControllerTask(config=self.config)
740 741 # The tasks work directory is *not* the actual work directory of
741 742 # the controller. It is used as the base path for the stdout/stderr
742 743 # files that the scheduler redirects to.
743 744 t.work_directory = self.profile_dir
744 745 # Add the profile_dir and from self.start().
745 746 t.controller_args.extend(self.extra_args)
746 747 job.add_task(t)
747 748
748 749 self.log.info("Writing job description file: %s" % self.job_file)
749 750 job.write(self.job_file)
750 751
751 752 @property
752 753 def job_file(self):
753 754 return os.path.join(self.profile_dir, self.job_file_name)
754 755
755 756 def start(self, profile_dir):
756 757 """Start the controller by profile_dir."""
757 758 self.extra_args = ['--profile-dir=%s'%profile_dir]
758 759 self.profile_dir = unicode(profile_dir)
759 760 return super(WindowsHPCControllerLauncher, self).start(1)
760 761
761 762
762 763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
763 764
764 765 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
765 766 help="jobfile for ipengines job")
766 767 extra_args = List([], config=False,
767 768 help="extra args to pas to ipengine")
768 769
769 770 def write_job_file(self, n):
770 771 job = IPEngineSetJob(config=self.config)
771 772
772 773 for i in range(n):
773 774 t = IPEngineTask(config=self.config)
774 775 # The tasks work directory is *not* the actual work directory of
775 776 # the engine. It is used as the base path for the stdout/stderr
776 777 # files that the scheduler redirects to.
777 778 t.work_directory = self.profile_dir
778 779 # Add the profile_dir and from self.start().
779 780 t.engine_args.extend(self.extra_args)
780 781 job.add_task(t)
781 782
782 783 self.log.info("Writing job description file: %s" % self.job_file)
783 784 job.write(self.job_file)
784 785
785 786 @property
786 787 def job_file(self):
787 788 return os.path.join(self.profile_dir, self.job_file_name)
788 789
789 790 def start(self, n, profile_dir):
790 791 """Start the controller by profile_dir."""
791 792 self.extra_args = ['--profile-dir=%s'%profile_dir]
792 793 self.profile_dir = unicode(profile_dir)
793 794 return super(WindowsHPCEngineSetLauncher, self).start(n)
794 795
795 796
796 797 #-----------------------------------------------------------------------------
797 798 # Batch (PBS) system launchers
798 799 #-----------------------------------------------------------------------------
799 800
800 801 class BatchSystemLauncher(BaseLauncher):
801 802 """Launch an external process using a batch system.
802 803
803 804 This class is designed to work with UNIX batch systems like PBS, LSF,
804 805 GridEngine, etc. The overall model is that there are different commands
805 806 like qsub, qdel, etc. that handle the starting and stopping of the process.
806 807
807 808 This class also has the notion of a batch script. The ``batch_template``
808 809 attribute can be set to a string that is a template for the batch script.
809 810 This template is instantiated using string formatting. Thus the template can
810 811 use {n} fot the number of instances. Subclasses can add additional variables
811 812 to the template dict.
812 813 """
813 814
814 815 # Subclasses must fill these in. See PBSEngineSet
815 816 submit_command = List([''], config=True,
816 817 help="The name of the command line program used to submit jobs.")
817 818 delete_command = List([''], config=True,
818 819 help="The name of the command line program used to delete jobs.")
819 820 job_id_regexp = Unicode('', config=True,
820 821 help="""A regular expression used to get the job id from the output of the
821 822 submit_command.""")
822 823 batch_template = Unicode('', config=True,
823 824 help="The string that is the batch script template itself.")
824 825 batch_template_file = Unicode(u'', config=True,
825 826 help="The file that contains the batch template.")
826 827 batch_file_name = Unicode(u'batch_script', config=True,
827 828 help="The filename of the instantiated batch script.")
828 829 queue = Unicode(u'', config=True,
829 830 help="The PBS Queue.")
830 831
831 832 # not configurable, override in subclasses
832 833 # PBS Job Array regex
833 834 job_array_regexp = Unicode('')
834 835 job_array_template = Unicode('')
835 836 # PBS Queue regex
836 837 queue_regexp = Unicode('')
837 838 queue_template = Unicode('')
838 839 # The default batch template, override in subclasses
839 840 default_template = Unicode('')
840 841 # The full path to the instantiated batch script.
841 842 batch_file = Unicode(u'')
842 843 # the format dict used with batch_template:
843 844 context = Dict()
844 845 # the Formatter instance for rendering the templates:
845 846 formatter = Instance(EvalFormatter, (), {})
846 847
847 848
848 849 def find_args(self):
849 850 return self.submit_command + [self.batch_file]
850 851
851 852 def __init__(self, work_dir=u'.', config=None, **kwargs):
852 853 super(BatchSystemLauncher, self).__init__(
853 854 work_dir=work_dir, config=config, **kwargs
854 855 )
855 856 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
856 857
857 858 def parse_job_id(self, output):
858 859 """Take the output of the submit command and return the job id."""
859 860 m = re.search(self.job_id_regexp, output)
860 861 if m is not None:
861 862 job_id = m.group()
862 863 else:
863 864 raise LauncherError("Job id couldn't be determined: %s" % output)
864 865 self.job_id = job_id
865 866 self.log.info('Job submitted with job id: %r' % job_id)
866 867 return job_id
867 868
868 869 def write_batch_script(self, n):
869 870 """Instantiate and write the batch script to the work_dir."""
870 871 self.context['n'] = n
871 872 self.context['queue'] = self.queue
872 873 # first priority is batch_template if set
873 874 if self.batch_template_file and not self.batch_template:
874 875 # second priority is batch_template_file
875 876 with open(self.batch_template_file) as f:
876 877 self.batch_template = f.read()
877 878 if not self.batch_template:
878 879 # third (last) priority is default_template
879 880 self.batch_template = self.default_template
880 881
881 882 # add jobarray or queue lines to user-specified template
882 883 # note that this is *only* when user did not specify a template.
883 884 regex = re.compile(self.job_array_regexp)
884 885 # print regex.search(self.batch_template)
885 886 if not regex.search(self.batch_template):
886 887 self.log.info("adding job array settings to batch script")
887 888 firstline, rest = self.batch_template.split('\n',1)
888 889 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
889 890
890 891 regex = re.compile(self.queue_regexp)
891 892 # print regex.search(self.batch_template)
892 893 if self.queue and not regex.search(self.batch_template):
893 894 self.log.info("adding PBS queue settings to batch script")
894 895 firstline, rest = self.batch_template.split('\n',1)
895 896 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
896 897
897 898 script_as_string = self.formatter.format(self.batch_template, **self.context)
898 899 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
899 900
900 901 with open(self.batch_file, 'w') as f:
901 902 f.write(script_as_string)
902 903 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
903 904
904 905 def start(self, n, profile_dir):
905 906 """Start n copies of the process using a batch system."""
906 907 # Here we save profile_dir in the context so they
907 908 # can be used in the batch script template as {profile_dir}
908 909 self.context['profile_dir'] = profile_dir
909 910 self.profile_dir = unicode(profile_dir)
910 911 self.write_batch_script(n)
911 912 output = check_output(self.args, env=os.environ)
912 913
913 914 job_id = self.parse_job_id(output)
914 915 self.notify_start(job_id)
915 916 return job_id
916 917
917 918 def stop(self):
918 919 output = check_output(self.delete_command+[self.job_id], env=os.environ)
919 920 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
920 921 return output
921 922
922 923
923 924 class PBSLauncher(BatchSystemLauncher):
924 925 """A BatchSystemLauncher subclass for PBS."""
925 926
926 927 submit_command = List(['qsub'], config=True,
927 928 help="The PBS submit command ['qsub']")
928 929 delete_command = List(['qdel'], config=True,
929 930 help="The PBS delete command ['qsub']")
930 931 job_id_regexp = Unicode(r'\d+', config=True,
931 932 help="Regular expresion for identifying the job ID [r'\d+']")
932 933
933 934 batch_file = Unicode(u'')
934 935 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
935 936 job_array_template = Unicode('#PBS -t 1-{n}')
936 937 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
937 938 queue_template = Unicode('#PBS -q {queue}')
938 939
939 940
940 941 class PBSControllerLauncher(PBSLauncher):
941 942 """Launch a controller using PBS."""
942 943
943 944 batch_file_name = Unicode(u'pbs_controller', config=True,
944 945 help="batch file name for the controller job.")
945 946 default_template= Unicode("""#!/bin/sh
946 947 #PBS -V
947 948 #PBS -N ipcontroller
948 949 %s --log-to-file --profile-dir={profile_dir}
949 950 """%(' '.join(ipcontroller_cmd_argv)))
950 951
951 952 def start(self, profile_dir):
952 953 """Start the controller by profile or profile_dir."""
953 954 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
954 955 return super(PBSControllerLauncher, self).start(1, profile_dir)
955 956
956 957
957 958 class PBSEngineSetLauncher(PBSLauncher):
958 959 """Launch Engines using PBS"""
959 960 batch_file_name = Unicode(u'pbs_engines', config=True,
960 961 help="batch file name for the engine(s) job.")
961 962 default_template= Unicode(u"""#!/bin/sh
962 963 #PBS -V
963 964 #PBS -N ipengine
964 965 %s --profile-dir={profile_dir}
965 966 """%(' '.join(ipengine_cmd_argv)))
966 967
967 968 def start(self, n, profile_dir):
968 969 """Start n engines by profile or profile_dir."""
969 970 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
970 971 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
971 972
972 973 #SGE is very similar to PBS
973 974
974 975 class SGELauncher(PBSLauncher):
975 976 """Sun GridEngine is a PBS clone with slightly different syntax"""
976 977 job_array_regexp = Unicode('#\$\W+\-t')
977 978 job_array_template = Unicode('#$ -t 1-{n}')
978 979 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
979 980 queue_template = Unicode('#$ -q {queue}')
980 981
981 982 class SGEControllerLauncher(SGELauncher):
982 983 """Launch a controller using SGE."""
983 984
984 985 batch_file_name = Unicode(u'sge_controller', config=True,
985 986 help="batch file name for the ipontroller job.")
986 987 default_template= Unicode(u"""#$ -V
987 988 #$ -S /bin/sh
988 989 #$ -N ipcontroller
989 990 %s --log-to-file --profile-dir={profile_dir}
990 991 """%(' '.join(ipcontroller_cmd_argv)))
991 992
992 993 def start(self, profile_dir):
993 994 """Start the controller by profile or profile_dir."""
994 995 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
995 996 return super(SGEControllerLauncher, self).start(1, profile_dir)
996 997
997 998 class SGEEngineSetLauncher(SGELauncher):
998 999 """Launch Engines with SGE"""
999 1000 batch_file_name = Unicode(u'sge_engines', config=True,
1000 1001 help="batch file name for the engine(s) job.")
1001 1002 default_template = Unicode("""#$ -V
1002 1003 #$ -S /bin/sh
1003 1004 #$ -N ipengine
1004 1005 %s --profile-dir={profile_dir}
1005 1006 """%(' '.join(ipengine_cmd_argv)))
1006 1007
1007 1008 def start(self, n, profile_dir):
1008 1009 """Start n engines by profile or profile_dir."""
1009 1010 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1010 1011 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1011 1012
1012 1013
1013 1014 # LSF launchers
1014 1015
1015 1016 class LSFLauncher(BatchSystemLauncher):
1016 1017 """A BatchSystemLauncher subclass for LSF."""
1017 1018
1018 1019 submit_command = List(['bsub'], config=True,
1019 1020 help="The PBS submit command ['bsub']")
1020 1021 delete_command = List(['bkill'], config=True,
1021 1022 help="The PBS delete command ['bkill']")
1022 1023 job_id_regexp = Unicode(r'\d+', config=True,
1023 1024 help="Regular expresion for identifying the job ID [r'\d+']")
1024 1025
1025 1026 batch_file = Unicode(u'')
1026 1027 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1027 1028 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1028 1029 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1029 1030 queue_template = Unicode('#BSUB -q {queue}')
1030 1031
1031 1032 def start(self, n, profile_dir):
1032 1033 """Start n copies of the process using LSF batch system.
1033 1034 This cant inherit from the base class because bsub expects
1034 1035 to be piped a shell script in order to honor the #BSUB directives :
1035 1036 bsub < script
1036 1037 """
1037 1038 # Here we save profile_dir in the context so they
1038 1039 # can be used in the batch script template as {profile_dir}
1039 1040 self.context['profile_dir'] = profile_dir
1040 1041 self.profile_dir = unicode(profile_dir)
1041 1042 self.write_batch_script(n)
1042 1043 #output = check_output(self.args, env=os.environ)
1043 1044 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1044 1045 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1045 1046 output,err = p.communicate()
1046 1047 job_id = self.parse_job_id(output)
1047 1048 self.notify_start(job_id)
1048 1049 return job_id
1049 1050
1050 1051
1051 1052 class LSFControllerLauncher(LSFLauncher):
1052 1053 """Launch a controller using LSF."""
1053 1054
1054 1055 batch_file_name = Unicode(u'lsf_controller', config=True,
1055 1056 help="batch file name for the controller job.")
1056 1057 default_template= Unicode("""#!/bin/sh
1057 1058 #BSUB -J ipcontroller
1058 1059 #BSUB -oo ipcontroller.o.%%J
1059 1060 #BSUB -eo ipcontroller.e.%%J
1060 1061 %s --log-to-file --profile-dir={profile_dir}
1061 1062 """%(' '.join(ipcontroller_cmd_argv)))
1062 1063
1063 1064 def start(self, profile_dir):
1064 1065 """Start the controller by profile or profile_dir."""
1065 1066 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1066 1067 return super(LSFControllerLauncher, self).start(1, profile_dir)
1067 1068
1068 1069
1069 1070 class LSFEngineSetLauncher(LSFLauncher):
1070 1071 """Launch Engines using LSF"""
1071 1072 batch_file_name = Unicode(u'lsf_engines', config=True,
1072 1073 help="batch file name for the engine(s) job.")
1073 1074 default_template= Unicode(u"""#!/bin/sh
1074 1075 #BSUB -oo ipengine.o.%%J
1075 1076 #BSUB -eo ipengine.e.%%J
1076 1077 %s --profile-dir={profile_dir}
1077 1078 """%(' '.join(ipengine_cmd_argv)))
1078 1079
1079 1080 def start(self, n, profile_dir):
1080 1081 """Start n engines by profile or profile_dir."""
1081 1082 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1082 1083 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1083 1084
1084 1085
1085 1086 #-----------------------------------------------------------------------------
1086 1087 # A launcher for ipcluster itself!
1087 1088 #-----------------------------------------------------------------------------
1088 1089
1089 1090
1090 1091 class IPClusterLauncher(LocalProcessLauncher):
1091 1092 """Launch the ipcluster program in an external process."""
1092 1093
1093 1094 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1094 1095 help="Popen command for ipcluster")
1095 1096 ipcluster_args = List(
1096 1097 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1097 1098 help="Command line arguments to pass to ipcluster.")
1098 1099 ipcluster_subcommand = Unicode('start')
1099 1100 ipcluster_n = Int(2)
1100 1101
1101 1102 def find_args(self):
1102 1103 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1103 1104 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1104 1105
1105 1106 def start(self):
1106 1107 self.log.info("Starting ipcluster: %r" % self.args)
1107 1108 return super(IPClusterLauncher, self).start()
1108 1109
1109 1110 #-----------------------------------------------------------------------------
1110 1111 # Collections of launchers
1111 1112 #-----------------------------------------------------------------------------
1112 1113
1113 1114 local_launchers = [
1114 1115 LocalControllerLauncher,
1115 1116 LocalEngineLauncher,
1116 1117 LocalEngineSetLauncher,
1117 1118 ]
1118 1119 mpi_launchers = [
1119 1120 MPIExecLauncher,
1120 1121 MPIExecControllerLauncher,
1121 1122 MPIExecEngineSetLauncher,
1122 1123 ]
1123 1124 ssh_launchers = [
1124 1125 SSHLauncher,
1125 1126 SSHControllerLauncher,
1126 1127 SSHEngineLauncher,
1127 1128 SSHEngineSetLauncher,
1128 1129 ]
1129 1130 winhpc_launchers = [
1130 1131 WindowsHPCLauncher,
1131 1132 WindowsHPCControllerLauncher,
1132 1133 WindowsHPCEngineSetLauncher,
1133 1134 ]
1134 1135 pbs_launchers = [
1135 1136 PBSLauncher,
1136 1137 PBSControllerLauncher,
1137 1138 PBSEngineSetLauncher,
1138 1139 ]
1139 1140 sge_launchers = [
1140 1141 SGELauncher,
1141 1142 SGEControllerLauncher,
1142 1143 SGEEngineSetLauncher,
1143 1144 ]
1144 1145 lsf_launchers = [
1145 1146 LSFLauncher,
1146 1147 LSFControllerLauncher,
1147 1148 LSFEngineSetLauncher,
1148 1149 ]
1149 1150 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1150 1151 + pbs_launchers + sge_launchers + lsf_launchers
1151 1152
General Comments 0
You need to be logged in to leave comments. Login now