##// END OF EJS Templates
don't automatically add jobarray or queue lines to user template...
MinRK -
Show More
@@ -1,1063 +1,1065 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10 """
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Copyright (C) 2008-2011 The IPython Development Team
14 14 #
15 15 # Distributed under the terms of the BSD License. The full license is in
16 16 # the file COPYING, distributed as part of this software.
17 17 #-----------------------------------------------------------------------------
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 import copy
24 24 import logging
25 25 import os
26 26 import re
27 27 import stat
28 28
29 29 # signal imports, handling various platforms, versions
30 30
31 31 from signal import SIGINT, SIGTERM
32 32 try:
33 33 from signal import SIGKILL
34 34 except ImportError:
35 35 # Windows
36 36 SIGKILL=SIGTERM
37 37
38 38 try:
39 39 # Windows >= 2.7, 3.2
40 40 from signal import CTRL_C_EVENT as SIGINT
41 41 except ImportError:
42 42 pass
43 43
44 44 from subprocess import Popen, PIPE, STDOUT
45 45 try:
46 46 from subprocess import check_output
47 47 except ImportError:
48 48 # pre-2.7, define check_output with Popen
49 49 def check_output(*args, **kwargs):
50 50 kwargs.update(dict(stdout=PIPE))
51 51 p = Popen(*args, **kwargs)
52 52 out,err = p.communicate()
53 53 return out
54 54
55 55 from zmq.eventloop import ioloop
56 56
57 57 from IPython.config.application import Application
58 58 from IPython.config.configurable import LoggingConfigurable
59 59 from IPython.utils.text import EvalFormatter
60 60 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
61 61 from IPython.utils.path import get_ipython_module_path
62 62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63 63
64 64 from .win32support import forward_read_events
65 65
66 66 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
67 67
68 68 WINDOWS = os.name == 'nt'
69 69
70 70 #-----------------------------------------------------------------------------
71 71 # Paths to the kernel apps
72 72 #-----------------------------------------------------------------------------
73 73
74 74
75 75 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
76 76 'IPython.parallel.apps.ipclusterapp'
77 77 ))
78 78
79 79 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
80 80 'IPython.parallel.apps.ipengineapp'
81 81 ))
82 82
83 83 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
84 84 'IPython.parallel.apps.ipcontrollerapp'
85 85 ))
86 86
87 87 #-----------------------------------------------------------------------------
88 88 # Base launchers and errors
89 89 #-----------------------------------------------------------------------------
90 90
91 91
92 92 class LauncherError(Exception):
93 93 pass
94 94
95 95
96 96 class ProcessStateError(LauncherError):
97 97 pass
98 98
99 99
100 100 class UnknownStatus(LauncherError):
101 101 pass
102 102
103 103
104 104 class BaseLauncher(LoggingConfigurable):
105 105 """An asbtraction for starting, stopping and signaling a process."""
106 106
107 107 # In all of the launchers, the work_dir is where child processes will be
108 108 # run. This will usually be the profile_dir, but may not be. any work_dir
109 109 # passed into the __init__ method will override the config value.
110 110 # This should not be used to set the work_dir for the actual engine
111 111 # and controller. Instead, use their own config files or the
112 112 # controller_args, engine_args attributes of the launchers to add
113 113 # the work_dir option.
114 114 work_dir = Unicode(u'.')
115 115 loop = Instance('zmq.eventloop.ioloop.IOLoop')
116 116
117 117 start_data = Any()
118 118 stop_data = Any()
119 119
120 120 def _loop_default(self):
121 121 return ioloop.IOLoop.instance()
122 122
123 123 def __init__(self, work_dir=u'.', config=None, **kwargs):
124 124 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
125 125 self.state = 'before' # can be before, running, after
126 126 self.stop_callbacks = []
127 127 self.start_data = None
128 128 self.stop_data = None
129 129
130 130 @property
131 131 def args(self):
132 132 """A list of cmd and args that will be used to start the process.
133 133
134 134 This is what is passed to :func:`spawnProcess` and the first element
135 135 will be the process name.
136 136 """
137 137 return self.find_args()
138 138
139 139 def find_args(self):
140 140 """The ``.args`` property calls this to find the args list.
141 141
142 142 Subcommand should implement this to construct the cmd and args.
143 143 """
144 144 raise NotImplementedError('find_args must be implemented in a subclass')
145 145
146 146 @property
147 147 def arg_str(self):
148 148 """The string form of the program arguments."""
149 149 return ' '.join(self.args)
150 150
151 151 @property
152 152 def running(self):
153 153 """Am I running."""
154 154 if self.state == 'running':
155 155 return True
156 156 else:
157 157 return False
158 158
159 159 def start(self):
160 160 """Start the process."""
161 161 raise NotImplementedError('start must be implemented in a subclass')
162 162
163 163 def stop(self):
164 164 """Stop the process and notify observers of stopping.
165 165
166 166 This method will return None immediately.
167 167 To observe the actual process stopping, see :meth:`on_stop`.
168 168 """
169 169 raise NotImplementedError('stop must be implemented in a subclass')
170 170
171 171 def on_stop(self, f):
172 172 """Register a callback to be called with this Launcher's stop_data
173 173 when the process actually finishes.
174 174 """
175 175 if self.state=='after':
176 176 return f(self.stop_data)
177 177 else:
178 178 self.stop_callbacks.append(f)
179 179
180 180 def notify_start(self, data):
181 181 """Call this to trigger startup actions.
182 182
183 183 This logs the process startup and sets the state to 'running'. It is
184 184 a pass-through so it can be used as a callback.
185 185 """
186 186
187 187 self.log.info('Process %r started: %r' % (self.args[0], data))
188 188 self.start_data = data
189 189 self.state = 'running'
190 190 return data
191 191
192 192 def notify_stop(self, data):
193 193 """Call this to trigger process stop actions.
194 194
195 195 This logs the process stopping and sets the state to 'after'. Call
196 196 this to trigger callbacks registered via :meth:`on_stop`."""
197 197
198 198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
199 199 self.stop_data = data
200 200 self.state = 'after'
201 201 for i in range(len(self.stop_callbacks)):
202 202 d = self.stop_callbacks.pop()
203 203 d(data)
204 204 return data
205 205
206 206 def signal(self, sig):
207 207 """Signal the process.
208 208
209 209 Parameters
210 210 ----------
211 211 sig : str or int
212 212 'KILL', 'INT', etc., or any signal number
213 213 """
214 214 raise NotImplementedError('signal must be implemented in a subclass')
215 215
216 216
217 217 #-----------------------------------------------------------------------------
218 218 # Local process launchers
219 219 #-----------------------------------------------------------------------------
220 220
221 221
222 222 class LocalProcessLauncher(BaseLauncher):
223 223 """Start and stop an external process in an asynchronous manner.
224 224
225 225 This will launch the external process with a working directory of
226 226 ``self.work_dir``.
227 227 """
228 228
229 229 # This is used to to construct self.args, which is passed to
230 230 # spawnProcess.
231 231 cmd_and_args = List([])
232 232 poll_frequency = Int(100) # in ms
233 233
234 234 def __init__(self, work_dir=u'.', config=None, **kwargs):
235 235 super(LocalProcessLauncher, self).__init__(
236 236 work_dir=work_dir, config=config, **kwargs
237 237 )
238 238 self.process = None
239 239 self.poller = None
240 240
241 241 def find_args(self):
242 242 return self.cmd_and_args
243 243
244 244 def start(self):
245 245 if self.state == 'before':
246 246 self.process = Popen(self.args,
247 247 stdout=PIPE,stderr=PIPE,stdin=PIPE,
248 248 env=os.environ,
249 249 cwd=self.work_dir
250 250 )
251 251 if WINDOWS:
252 252 self.stdout = forward_read_events(self.process.stdout)
253 253 self.stderr = forward_read_events(self.process.stderr)
254 254 else:
255 255 self.stdout = self.process.stdout.fileno()
256 256 self.stderr = self.process.stderr.fileno()
257 257 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
258 258 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
259 259 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
260 260 self.poller.start()
261 261 self.notify_start(self.process.pid)
262 262 else:
263 263 s = 'The process was already started and has state: %r' % self.state
264 264 raise ProcessStateError(s)
265 265
266 266 def stop(self):
267 267 return self.interrupt_then_kill()
268 268
269 269 def signal(self, sig):
270 270 if self.state == 'running':
271 271 if WINDOWS and sig != SIGINT:
272 272 # use Windows tree-kill for better child cleanup
273 273 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
274 274 else:
275 275 self.process.send_signal(sig)
276 276
277 277 def interrupt_then_kill(self, delay=2.0):
278 278 """Send INT, wait a delay and then send KILL."""
279 279 try:
280 280 self.signal(SIGINT)
281 281 except Exception:
282 282 self.log.debug("interrupt failed")
283 283 pass
284 284 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
285 285 self.killer.start()
286 286
287 287 # callbacks, etc:
288 288
289 289 def handle_stdout(self, fd, events):
290 290 if WINDOWS:
291 291 line = self.stdout.recv()
292 292 else:
293 293 line = self.process.stdout.readline()
294 294 # a stopped process will be readable but return empty strings
295 295 if line:
296 296 self.log.info(line[:-1])
297 297 else:
298 298 self.poll()
299 299
300 300 def handle_stderr(self, fd, events):
301 301 if WINDOWS:
302 302 line = self.stderr.recv()
303 303 else:
304 304 line = self.process.stderr.readline()
305 305 # a stopped process will be readable but return empty strings
306 306 if line:
307 307 self.log.error(line[:-1])
308 308 else:
309 309 self.poll()
310 310
311 311 def poll(self):
312 312 status = self.process.poll()
313 313 if status is not None:
314 314 self.poller.stop()
315 315 self.loop.remove_handler(self.stdout)
316 316 self.loop.remove_handler(self.stderr)
317 317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
318 318 return status
319 319
320 320 class LocalControllerLauncher(LocalProcessLauncher):
321 321 """Launch a controller as a regular external process."""
322 322
323 323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
324 324 help="""Popen command to launch ipcontroller.""")
325 325 # Command line arguments to ipcontroller.
326 326 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
327 327 help="""command-line args to pass to ipcontroller""")
328 328
329 329 def find_args(self):
330 330 return self.controller_cmd + self.controller_args
331 331
332 332 def start(self, profile_dir):
333 333 """Start the controller by profile_dir."""
334 334 self.controller_args.extend(['profile_dir=%s'%profile_dir])
335 335 self.profile_dir = unicode(profile_dir)
336 336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
337 337 return super(LocalControllerLauncher, self).start()
338 338
339 339
340 340 class LocalEngineLauncher(LocalProcessLauncher):
341 341 """Launch a single engine as a regular externall process."""
342 342
343 343 engine_cmd = List(ipengine_cmd_argv, config=True,
344 344 help="""command to launch the Engine.""")
345 345 # Command line arguments for ipengine.
346 346 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
347 347 help="command-line arguments to pass to ipengine"
348 348 )
349 349
350 350 def find_args(self):
351 351 return self.engine_cmd + self.engine_args
352 352
353 353 def start(self, profile_dir):
354 354 """Start the engine by profile_dir."""
355 355 self.engine_args.extend(['profile_dir=%s'%profile_dir])
356 356 self.profile_dir = unicode(profile_dir)
357 357 return super(LocalEngineLauncher, self).start()
358 358
359 359
360 360 class LocalEngineSetLauncher(BaseLauncher):
361 361 """Launch a set of engines as regular external processes."""
362 362
363 363 # Command line arguments for ipengine.
364 364 engine_args = List(
365 365 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
366 366 help="command-line arguments to pass to ipengine"
367 367 )
368 368 # launcher class
369 369 launcher_class = LocalEngineLauncher
370 370
371 371 launchers = Dict()
372 372 stop_data = Dict()
373 373
374 374 def __init__(self, work_dir=u'.', config=None, **kwargs):
375 375 super(LocalEngineSetLauncher, self).__init__(
376 376 work_dir=work_dir, config=config, **kwargs
377 377 )
378 378 self.stop_data = {}
379 379
380 380 def start(self, n, profile_dir):
381 381 """Start n engines by profile or profile_dir."""
382 382 self.profile_dir = unicode(profile_dir)
383 383 dlist = []
384 384 for i in range(n):
385 385 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
386 386 # Copy the engine args over to each engine launcher.
387 387 el.engine_args = copy.deepcopy(self.engine_args)
388 388 el.on_stop(self._notice_engine_stopped)
389 389 d = el.start(profile_dir)
390 390 if i==0:
391 391 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
392 392 self.launchers[i] = el
393 393 dlist.append(d)
394 394 self.notify_start(dlist)
395 395 # The consumeErrors here could be dangerous
396 396 # dfinal = gatherBoth(dlist, consumeErrors=True)
397 397 # dfinal.addCallback(self.notify_start)
398 398 return dlist
399 399
400 400 def find_args(self):
401 401 return ['engine set']
402 402
403 403 def signal(self, sig):
404 404 dlist = []
405 405 for el in self.launchers.itervalues():
406 406 d = el.signal(sig)
407 407 dlist.append(d)
408 408 # dfinal = gatherBoth(dlist, consumeErrors=True)
409 409 return dlist
410 410
411 411 def interrupt_then_kill(self, delay=1.0):
412 412 dlist = []
413 413 for el in self.launchers.itervalues():
414 414 d = el.interrupt_then_kill(delay)
415 415 dlist.append(d)
416 416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 417 return dlist
418 418
419 419 def stop(self):
420 420 return self.interrupt_then_kill()
421 421
422 422 def _notice_engine_stopped(self, data):
423 423 pid = data['pid']
424 424 for idx,el in self.launchers.iteritems():
425 425 if el.process.pid == pid:
426 426 break
427 427 self.launchers.pop(idx)
428 428 self.stop_data[idx] = data
429 429 if not self.launchers:
430 430 self.notify_stop(self.stop_data)
431 431
432 432
433 433 #-----------------------------------------------------------------------------
434 434 # MPIExec launchers
435 435 #-----------------------------------------------------------------------------
436 436
437 437
438 438 class MPIExecLauncher(LocalProcessLauncher):
439 439 """Launch an external process using mpiexec."""
440 440
441 441 mpi_cmd = List(['mpiexec'], config=True,
442 442 help="The mpiexec command to use in starting the process."
443 443 )
444 444 mpi_args = List([], config=True,
445 445 help="The command line arguments to pass to mpiexec."
446 446 )
447 447 program = List(['date'], config=True,
448 448 help="The program to start via mpiexec.")
449 449 program_args = List([], config=True,
450 450 help="The command line argument to the program."
451 451 )
452 452 n = Int(1)
453 453
454 454 def find_args(self):
455 455 """Build self.args using all the fields."""
456 456 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
457 457 self.program + self.program_args
458 458
459 459 def start(self, n):
460 460 """Start n instances of the program using mpiexec."""
461 461 self.n = n
462 462 return super(MPIExecLauncher, self).start()
463 463
464 464
465 465 class MPIExecControllerLauncher(MPIExecLauncher):
466 466 """Launch a controller using mpiexec."""
467 467
468 468 controller_cmd = List(ipcontroller_cmd_argv, config=True,
469 469 help="Popen command to launch the Contropper"
470 470 )
471 471 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
472 472 help="Command line arguments to pass to ipcontroller."
473 473 )
474 474 n = Int(1)
475 475
476 476 def start(self, profile_dir):
477 477 """Start the controller by profile_dir."""
478 478 self.controller_args.extend(['profile_dir=%s'%profile_dir])
479 479 self.profile_dir = unicode(profile_dir)
480 480 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
481 481 return super(MPIExecControllerLauncher, self).start(1)
482 482
483 483 def find_args(self):
484 484 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
485 485 self.controller_cmd + self.controller_args
486 486
487 487
488 488 class MPIExecEngineSetLauncher(MPIExecLauncher):
489 489
490 490 program = List(ipengine_cmd_argv, config=True,
491 491 help="Popen command for ipengine"
492 492 )
493 493 program_args = List(
494 494 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
495 495 help="Command line arguments for ipengine."
496 496 )
497 497 n = Int(1)
498 498
499 499 def start(self, n, profile_dir):
500 500 """Start n engines by profile or profile_dir."""
501 501 self.program_args.extend(['profile_dir=%s'%profile_dir])
502 502 self.profile_dir = unicode(profile_dir)
503 503 self.n = n
504 504 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
505 505 return super(MPIExecEngineSetLauncher, self).start(n)
506 506
507 507 #-----------------------------------------------------------------------------
508 508 # SSH launchers
509 509 #-----------------------------------------------------------------------------
510 510
511 511 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
512 512
513 513 class SSHLauncher(LocalProcessLauncher):
514 514 """A minimal launcher for ssh.
515 515
516 516 To be useful this will probably have to be extended to use the ``sshx``
517 517 idea for environment variables. There could be other things this needs
518 518 as well.
519 519 """
520 520
521 521 ssh_cmd = List(['ssh'], config=True,
522 522 help="command for starting ssh")
523 523 ssh_args = List(['-tt'], config=True,
524 524 help="args to pass to ssh")
525 525 program = List(['date'], config=True,
526 526 help="Program to launch via ssh")
527 527 program_args = List([], config=True,
528 528 help="args to pass to remote program")
529 529 hostname = Unicode('', config=True,
530 530 help="hostname on which to launch the program")
531 531 user = Unicode('', config=True,
532 532 help="username for ssh")
533 533 location = Unicode('', config=True,
534 534 help="user@hostname location for ssh in one setting")
535 535
536 536 def _hostname_changed(self, name, old, new):
537 537 if self.user:
538 538 self.location = u'%s@%s' % (self.user, new)
539 539 else:
540 540 self.location = new
541 541
542 542 def _user_changed(self, name, old, new):
543 543 self.location = u'%s@%s' % (new, self.hostname)
544 544
545 545 def find_args(self):
546 546 return self.ssh_cmd + self.ssh_args + [self.location] + \
547 547 self.program + self.program_args
548 548
549 549 def start(self, profile_dir, hostname=None, user=None):
550 550 self.profile_dir = unicode(profile_dir)
551 551 if hostname is not None:
552 552 self.hostname = hostname
553 553 if user is not None:
554 554 self.user = user
555 555
556 556 return super(SSHLauncher, self).start()
557 557
558 558 def signal(self, sig):
559 559 if self.state == 'running':
560 560 # send escaped ssh connection-closer
561 561 self.process.stdin.write('~.')
562 562 self.process.stdin.flush()
563 563
564 564
565 565
566 566 class SSHControllerLauncher(SSHLauncher):
567 567
568 568 program = List(ipcontroller_cmd_argv, config=True,
569 569 help="remote ipcontroller command.")
570 570 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
571 571 help="Command line arguments to ipcontroller.")
572 572
573 573
574 574 class SSHEngineLauncher(SSHLauncher):
575 575 program = List(ipengine_cmd_argv, config=True,
576 576 help="remote ipengine command.")
577 577 # Command line arguments for ipengine.
578 578 program_args = List(
579 579 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
580 580 help="Command line arguments to ipengine."
581 581 )
582 582
583 583 class SSHEngineSetLauncher(LocalEngineSetLauncher):
584 584 launcher_class = SSHEngineLauncher
585 585 engines = Dict(config=True,
586 586 help="""dict of engines to launch. This is a dict by hostname of ints,
587 587 corresponding to the number of engines to start on that host.""")
588 588
589 589 def start(self, n, profile_dir):
590 590 """Start engines by profile or profile_dir.
591 591 `n` is ignored, and the `engines` config property is used instead.
592 592 """
593 593
594 594 self.profile_dir = unicode(profile_dir)
595 595 dlist = []
596 596 for host, n in self.engines.iteritems():
597 597 if isinstance(n, (tuple, list)):
598 598 n, args = n
599 599 else:
600 600 args = copy.deepcopy(self.engine_args)
601 601
602 602 if '@' in host:
603 603 user,host = host.split('@',1)
604 604 else:
605 605 user=None
606 606 for i in range(n):
607 607 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
608 608
609 609 # Copy the engine args over to each engine launcher.
610 610 i
611 611 el.program_args = args
612 612 el.on_stop(self._notice_engine_stopped)
613 613 d = el.start(profile_dir, user=user, hostname=host)
614 614 if i==0:
615 615 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
616 616 self.launchers[host+str(i)] = el
617 617 dlist.append(d)
618 618 self.notify_start(dlist)
619 619 return dlist
620 620
621 621
622 622
623 623 #-----------------------------------------------------------------------------
624 624 # Windows HPC Server 2008 scheduler launchers
625 625 #-----------------------------------------------------------------------------
626 626
627 627
628 628 # This is only used on Windows.
629 629 def find_job_cmd():
630 630 if WINDOWS:
631 631 try:
632 632 return find_cmd('job')
633 633 except (FindCmdError, ImportError):
634 634 # ImportError will be raised if win32api is not installed
635 635 return 'job'
636 636 else:
637 637 return 'job'
638 638
639 639
640 640 class WindowsHPCLauncher(BaseLauncher):
641 641
642 642 job_id_regexp = Unicode(r'\d+', config=True,
643 643 help="""A regular expression used to get the job id from the output of the
644 644 submit_command. """
645 645 )
646 646 job_file_name = Unicode(u'ipython_job.xml', config=True,
647 647 help="The filename of the instantiated job script.")
648 648 # The full path to the instantiated job script. This gets made dynamically
649 649 # by combining the work_dir with the job_file_name.
650 650 job_file = Unicode(u'')
651 651 scheduler = Unicode('', config=True,
652 652 help="The hostname of the scheduler to submit the job to.")
653 653 job_cmd = Unicode(find_job_cmd(), config=True,
654 654 help="The command for submitting jobs.")
655 655
656 656 def __init__(self, work_dir=u'.', config=None, **kwargs):
657 657 super(WindowsHPCLauncher, self).__init__(
658 658 work_dir=work_dir, config=config, **kwargs
659 659 )
660 660
661 661 @property
662 662 def job_file(self):
663 663 return os.path.join(self.work_dir, self.job_file_name)
664 664
665 665 def write_job_file(self, n):
666 666 raise NotImplementedError("Implement write_job_file in a subclass.")
667 667
668 668 def find_args(self):
669 669 return [u'job.exe']
670 670
671 671 def parse_job_id(self, output):
672 672 """Take the output of the submit command and return the job id."""
673 673 m = re.search(self.job_id_regexp, output)
674 674 if m is not None:
675 675 job_id = m.group()
676 676 else:
677 677 raise LauncherError("Job id couldn't be determined: %s" % output)
678 678 self.job_id = job_id
679 679 self.log.info('Job started with job id: %r' % job_id)
680 680 return job_id
681 681
682 682 def start(self, n):
683 683 """Start n copies of the process using the Win HPC job scheduler."""
684 684 self.write_job_file(n)
685 685 args = [
686 686 'submit',
687 687 '/jobfile:%s' % self.job_file,
688 688 '/scheduler:%s' % self.scheduler
689 689 ]
690 690 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
691 691
692 692 output = check_output([self.job_cmd]+args,
693 693 env=os.environ,
694 694 cwd=self.work_dir,
695 695 stderr=STDOUT
696 696 )
697 697 job_id = self.parse_job_id(output)
698 698 self.notify_start(job_id)
699 699 return job_id
700 700
701 701 def stop(self):
702 702 args = [
703 703 'cancel',
704 704 self.job_id,
705 705 '/scheduler:%s' % self.scheduler
706 706 ]
707 707 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
708 708 try:
709 709 output = check_output([self.job_cmd]+args,
710 710 env=os.environ,
711 711 cwd=self.work_dir,
712 712 stderr=STDOUT
713 713 )
714 714 except:
715 715 output = 'The job already appears to be stoppped: %r' % self.job_id
716 716 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
717 717 return output
718 718
719 719
720 720 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
721 721
722 722 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
723 723 help="WinHPC xml job file.")
724 724 extra_args = List([], config=False,
725 725 help="extra args to pass to ipcontroller")
726 726
727 727 def write_job_file(self, n):
728 728 job = IPControllerJob(config=self.config)
729 729
730 730 t = IPControllerTask(config=self.config)
731 731 # The tasks work directory is *not* the actual work directory of
732 732 # the controller. It is used as the base path for the stdout/stderr
733 733 # files that the scheduler redirects to.
734 734 t.work_directory = self.profile_dir
735 735 # Add the profile_dir and from self.start().
736 736 t.controller_args.extend(self.extra_args)
737 737 job.add_task(t)
738 738
739 739 self.log.info("Writing job description file: %s" % self.job_file)
740 740 job.write(self.job_file)
741 741
742 742 @property
743 743 def job_file(self):
744 744 return os.path.join(self.profile_dir, self.job_file_name)
745 745
746 746 def start(self, profile_dir):
747 747 """Start the controller by profile_dir."""
748 748 self.extra_args = ['profile_dir=%s'%profile_dir]
749 749 self.profile_dir = unicode(profile_dir)
750 750 return super(WindowsHPCControllerLauncher, self).start(1)
751 751
752 752
753 753 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
754 754
755 755 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
756 756 help="jobfile for ipengines job")
757 757 extra_args = List([], config=False,
758 758 help="extra args to pas to ipengine")
759 759
760 760 def write_job_file(self, n):
761 761 job = IPEngineSetJob(config=self.config)
762 762
763 763 for i in range(n):
764 764 t = IPEngineTask(config=self.config)
765 765 # The tasks work directory is *not* the actual work directory of
766 766 # the engine. It is used as the base path for the stdout/stderr
767 767 # files that the scheduler redirects to.
768 768 t.work_directory = self.profile_dir
769 769 # Add the profile_dir and from self.start().
770 770 t.engine_args.extend(self.extra_args)
771 771 job.add_task(t)
772 772
773 773 self.log.info("Writing job description file: %s" % self.job_file)
774 774 job.write(self.job_file)
775 775
776 776 @property
777 777 def job_file(self):
778 778 return os.path.join(self.profile_dir, self.job_file_name)
779 779
780 780 def start(self, n, profile_dir):
781 781 """Start the controller by profile_dir."""
782 782 self.extra_args = ['profile_dir=%s'%profile_dir]
783 783 self.profile_dir = unicode(profile_dir)
784 784 return super(WindowsHPCEngineSetLauncher, self).start(n)
785 785
786 786
787 787 #-----------------------------------------------------------------------------
788 788 # Batch (PBS) system launchers
789 789 #-----------------------------------------------------------------------------
790 790
791 791 class BatchSystemLauncher(BaseLauncher):
792 792 """Launch an external process using a batch system.
793 793
794 794 This class is designed to work with UNIX batch systems like PBS, LSF,
795 795 GridEngine, etc. The overall model is that there are different commands
796 796 like qsub, qdel, etc. that handle the starting and stopping of the process.
797 797
798 798 This class also has the notion of a batch script. The ``batch_template``
799 799 attribute can be set to a string that is a template for the batch script.
800 800 This template is instantiated using string formatting. Thus the template can
801 801 use {n} fot the number of instances. Subclasses can add additional variables
802 802 to the template dict.
803 803 """
804 804
805 805 # Subclasses must fill these in. See PBSEngineSet
806 806 submit_command = List([''], config=True,
807 807 help="The name of the command line program used to submit jobs.")
808 808 delete_command = List([''], config=True,
809 809 help="The name of the command line program used to delete jobs.")
810 810 job_id_regexp = Unicode('', config=True,
811 811 help="""A regular expression used to get the job id from the output of the
812 812 submit_command.""")
813 813 batch_template = Unicode('', config=True,
814 814 help="The string that is the batch script template itself.")
815 815 batch_template_file = Unicode(u'', config=True,
816 816 help="The file that contains the batch template.")
817 817 batch_file_name = Unicode(u'batch_script', config=True,
818 818 help="The filename of the instantiated batch script.")
819 819 queue = Unicode(u'', config=True,
820 820 help="The PBS Queue.")
821 821
822 822 # not configurable, override in subclasses
823 823 # PBS Job Array regex
824 824 job_array_regexp = Unicode('')
825 825 job_array_template = Unicode('')
826 826 # PBS Queue regex
827 827 queue_regexp = Unicode('')
828 828 queue_template = Unicode('')
829 829 # The default batch template, override in subclasses
830 830 default_template = Unicode('')
831 831 # The full path to the instantiated batch script.
832 832 batch_file = Unicode(u'')
833 833 # the format dict used with batch_template:
834 834 context = Dict()
835 835 # the Formatter instance for rendering the templates:
836 836 formatter = Instance(EvalFormatter, (), {})
837 837
838 838
839 839 def find_args(self):
840 840 return self.submit_command + [self.batch_file]
841 841
842 842 def __init__(self, work_dir=u'.', config=None, **kwargs):
843 843 super(BatchSystemLauncher, self).__init__(
844 844 work_dir=work_dir, config=config, **kwargs
845 845 )
846 846 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
847 847
848 848 def parse_job_id(self, output):
849 849 """Take the output of the submit command and return the job id."""
850 850 m = re.search(self.job_id_regexp, output)
851 851 if m is not None:
852 852 job_id = m.group()
853 853 else:
854 854 raise LauncherError("Job id couldn't be determined: %s" % output)
855 855 self.job_id = job_id
856 856 self.log.info('Job submitted with job id: %r' % job_id)
857 857 return job_id
858 858
859 859 def write_batch_script(self, n):
860 860 """Instantiate and write the batch script to the work_dir."""
861 861 self.context['n'] = n
862 862 self.context['queue'] = self.queue
863 863 # first priority is batch_template if set
864 864 if self.batch_template_file and not self.batch_template:
865 865 # second priority is batch_template_file
866 866 with open(self.batch_template_file) as f:
867 867 self.batch_template = f.read()
868 868 if not self.batch_template:
869 869 # third (last) priority is default_template
870 870 self.batch_template = self.default_template
871 871
872 # add jobarray or queue lines to user-specified template
873 # note that this is *only* when user did not specify a template.
872 874 regex = re.compile(self.job_array_regexp)
873 875 # print regex.search(self.batch_template)
874 876 if not regex.search(self.batch_template):
875 877 self.log.info("adding job array settings to batch script")
876 878 firstline, rest = self.batch_template.split('\n',1)
877 879 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
878 880
879 881 regex = re.compile(self.queue_regexp)
880 882 # print regex.search(self.batch_template)
881 883 if self.queue and not regex.search(self.batch_template):
882 884 self.log.info("adding PBS queue settings to batch script")
883 885 firstline, rest = self.batch_template.split('\n',1)
884 886 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
885 887
886 888 script_as_string = self.formatter.format(self.batch_template, **self.context)
887 889 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
888 890
889 891 with open(self.batch_file, 'w') as f:
890 892 f.write(script_as_string)
891 893 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
892 894
893 895 def start(self, n, profile_dir):
894 896 """Start n copies of the process using a batch system."""
895 897 # Here we save profile_dir in the context so they
896 898 # can be used in the batch script template as {profile_dir}
897 899 self.context['profile_dir'] = profile_dir
898 900 self.profile_dir = unicode(profile_dir)
899 901 self.write_batch_script(n)
900 902 output = check_output(self.args, env=os.environ)
901 903
902 904 job_id = self.parse_job_id(output)
903 905 self.notify_start(job_id)
904 906 return job_id
905 907
906 908 def stop(self):
907 909 output = check_output(self.delete_command+[self.job_id], env=os.environ)
908 910 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
909 911 return output
910 912
911 913
912 914 class PBSLauncher(BatchSystemLauncher):
913 915 """A BatchSystemLauncher subclass for PBS."""
914 916
915 917 submit_command = List(['qsub'], config=True,
916 918 help="The PBS submit command ['qsub']")
917 919 delete_command = List(['qdel'], config=True,
918 920 help="The PBS delete command ['qsub']")
919 921 job_id_regexp = Unicode(r'\d+', config=True,
920 922 help="Regular expresion for identifying the job ID [r'\d+']")
921 923
922 924 batch_file = Unicode(u'')
923 925 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
924 926 job_array_template = Unicode('#PBS -t 1-{n}')
925 927 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
926 928 queue_template = Unicode('#PBS -q {queue}')
927 929
928 930
929 931 class PBSControllerLauncher(PBSLauncher):
930 932 """Launch a controller using PBS."""
931 933
932 934 batch_file_name = Unicode(u'pbs_controller', config=True,
933 935 help="batch file name for the controller job.")
934 936 default_template= Unicode("""#!/bin/sh
935 937 #PBS -V
936 938 #PBS -N ipcontroller
937 939 %s --log-to-file profile_dir={profile_dir}
938 940 """%(' '.join(ipcontroller_cmd_argv)))
939 941
940 942 def start(self, profile_dir):
941 943 """Start the controller by profile or profile_dir."""
942 944 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
943 945 return super(PBSControllerLauncher, self).start(1, profile_dir)
944 946
945 947
946 948 class PBSEngineSetLauncher(PBSLauncher):
947 949 """Launch Engines using PBS"""
948 950 batch_file_name = Unicode(u'pbs_engines', config=True,
949 951 help="batch file name for the engine(s) job.")
950 952 default_template= Unicode(u"""#!/bin/sh
951 953 #PBS -V
952 954 #PBS -N ipengine
953 955 %s profile_dir={profile_dir}
954 956 """%(' '.join(ipengine_cmd_argv)))
955 957
956 958 def start(self, n, profile_dir):
957 959 """Start n engines by profile or profile_dir."""
958 960 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
959 961 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
960 962
961 963 #SGE is very similar to PBS
962 964
963 965 class SGELauncher(PBSLauncher):
964 966 """Sun GridEngine is a PBS clone with slightly different syntax"""
965 967 job_array_regexp = Unicode('#\$\W+\-t')
966 968 job_array_template = Unicode('#$ -t 1-{n}')
967 969 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
968 970 queue_template = Unicode('#$ -q {queue}')
969 971
970 972 class SGEControllerLauncher(SGELauncher):
971 973 """Launch a controller using SGE."""
972 974
973 975 batch_file_name = Unicode(u'sge_controller', config=True,
974 976 help="batch file name for the ipontroller job.")
975 977 default_template= Unicode(u"""#$ -V
976 978 #$ -S /bin/sh
977 979 #$ -N ipcontroller
978 980 %s --log-to-file profile_dir={profile_dir}
979 981 """%(' '.join(ipcontroller_cmd_argv)))
980 982
981 983 def start(self, profile_dir):
982 984 """Start the controller by profile or profile_dir."""
983 985 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
984 986 return super(SGEControllerLauncher, self).start(1, profile_dir)
985 987
986 988 class SGEEngineSetLauncher(SGELauncher):
987 989 """Launch Engines with SGE"""
988 990 batch_file_name = Unicode(u'sge_engines', config=True,
989 991 help="batch file name for the engine(s) job.")
990 992 default_template = Unicode("""#$ -V
991 993 #$ -S /bin/sh
992 994 #$ -N ipengine
993 995 %s profile_dir={profile_dir}
994 996 """%(' '.join(ipengine_cmd_argv)))
995 997
996 998 def start(self, n, profile_dir):
997 999 """Start n engines by profile or profile_dir."""
998 1000 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
999 1001 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1000 1002
1001 1003
1002 1004 #-----------------------------------------------------------------------------
1003 1005 # A launcher for ipcluster itself!
1004 1006 #-----------------------------------------------------------------------------
1005 1007
1006 1008
1007 1009 class IPClusterLauncher(LocalProcessLauncher):
1008 1010 """Launch the ipcluster program in an external process."""
1009 1011
1010 1012 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1011 1013 help="Popen command for ipcluster")
1012 1014 ipcluster_args = List(
1013 1015 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1014 1016 help="Command line arguments to pass to ipcluster.")
1015 1017 ipcluster_subcommand = Unicode('start')
1016 1018 ipcluster_n = Int(2)
1017 1019
1018 1020 def find_args(self):
1019 1021 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1020 1022 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1021 1023
1022 1024 def start(self):
1023 1025 self.log.info("Starting ipcluster: %r" % self.args)
1024 1026 return super(IPClusterLauncher, self).start()
1025 1027
1026 1028 #-----------------------------------------------------------------------------
1027 1029 # Collections of launchers
1028 1030 #-----------------------------------------------------------------------------
1029 1031
1030 1032 local_launchers = [
1031 1033 LocalControllerLauncher,
1032 1034 LocalEngineLauncher,
1033 1035 LocalEngineSetLauncher,
1034 1036 ]
1035 1037 mpi_launchers = [
1036 1038 MPIExecLauncher,
1037 1039 MPIExecControllerLauncher,
1038 1040 MPIExecEngineSetLauncher,
1039 1041 ]
1040 1042 ssh_launchers = [
1041 1043 SSHLauncher,
1042 1044 SSHControllerLauncher,
1043 1045 SSHEngineLauncher,
1044 1046 SSHEngineSetLauncher,
1045 1047 ]
1046 1048 winhpc_launchers = [
1047 1049 WindowsHPCLauncher,
1048 1050 WindowsHPCControllerLauncher,
1049 1051 WindowsHPCEngineSetLauncher,
1050 1052 ]
1051 1053 pbs_launchers = [
1052 1054 PBSLauncher,
1053 1055 PBSControllerLauncher,
1054 1056 PBSEngineSetLauncher,
1055 1057 ]
1056 1058 sge_launchers = [
1057 1059 SGELauncher,
1058 1060 SGEControllerLauncher,
1059 1061 SGEEngineSetLauncher,
1060 1062 ]
1061 1063 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1062 1064 + pbs_launchers + sge_launchers
1063 1065
General Comments 0
You need to be logged in to leave comments. Login now