##// END OF EJS Templates
fix controller_args -> engine_args typo in WinHPCEngine
MinRK -
Show More
@@ -1,1184 +1,1184
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import re
26 26 import stat
27 27 import time
28 28
29 29 # signal imports, handling various platforms, versions
30 30
31 31 from signal import SIGINT, SIGTERM
32 32 try:
33 33 from signal import SIGKILL
34 34 except ImportError:
35 35 # Windows
36 36 SIGKILL=SIGTERM
37 37
38 38 try:
39 39 # Windows >= 2.7, 3.2
40 40 from signal import CTRL_C_EVENT as SIGINT
41 41 except ImportError:
42 42 pass
43 43
44 44 from subprocess import Popen, PIPE, STDOUT
45 45 try:
46 46 from subprocess import check_output
47 47 except ImportError:
48 48 # pre-2.7, define check_output with Popen
49 49 def check_output(*args, **kwargs):
50 50 kwargs.update(dict(stdout=PIPE))
51 51 p = Popen(*args, **kwargs)
52 52 out,err = p.communicate()
53 53 return out
54 54
55 55 from zmq.eventloop import ioloop
56 56
57 57 from IPython.config.application import Application
58 58 from IPython.config.configurable import LoggingConfigurable
59 59 from IPython.utils.text import EvalFormatter
60 60 from IPython.utils.traitlets import (
61 61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 62 )
63 63 from IPython.utils.path import get_ipython_module_path
64 64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
65 65
66 66 from .win32support import forward_read_events
67 67
68 68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69 69
70 70 WINDOWS = os.name == 'nt'
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Paths to the kernel apps
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 78 'IPython.parallel.apps.ipclusterapp'
79 79 ))
80 80
81 81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 82 'IPython.parallel.apps.ipengineapp'
83 83 ))
84 84
85 85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 86 'IPython.parallel.apps.ipcontrollerapp'
87 87 ))
88 88
89 89 #-----------------------------------------------------------------------------
90 90 # Base launchers and errors
91 91 #-----------------------------------------------------------------------------
92 92
93 93
94 94 class LauncherError(Exception):
95 95 pass
96 96
97 97
98 98 class ProcessStateError(LauncherError):
99 99 pass
100 100
101 101
102 102 class UnknownStatus(LauncherError):
103 103 pass
104 104
105 105
106 106 class BaseLauncher(LoggingConfigurable):
107 107 """An asbtraction for starting, stopping and signaling a process."""
108 108
109 109 # In all of the launchers, the work_dir is where child processes will be
110 110 # run. This will usually be the profile_dir, but may not be. any work_dir
111 111 # passed into the __init__ method will override the config value.
112 112 # This should not be used to set the work_dir for the actual engine
113 113 # and controller. Instead, use their own config files or the
114 114 # controller_args, engine_args attributes of the launchers to add
115 115 # the work_dir option.
116 116 work_dir = Unicode(u'.')
117 117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118 118
119 119 start_data = Any()
120 120 stop_data = Any()
121 121
122 122 def _loop_default(self):
123 123 return ioloop.IOLoop.instance()
124 124
125 125 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 127 self.state = 'before' # can be before, running, after
128 128 self.stop_callbacks = []
129 129 self.start_data = None
130 130 self.stop_data = None
131 131
132 132 @property
133 133 def args(self):
134 134 """A list of cmd and args that will be used to start the process.
135 135
136 136 This is what is passed to :func:`spawnProcess` and the first element
137 137 will be the process name.
138 138 """
139 139 return self.find_args()
140 140
141 141 def find_args(self):
142 142 """The ``.args`` property calls this to find the args list.
143 143
144 144 Subcommand should implement this to construct the cmd and args.
145 145 """
146 146 raise NotImplementedError('find_args must be implemented in a subclass')
147 147
148 148 @property
149 149 def arg_str(self):
150 150 """The string form of the program arguments."""
151 151 return ' '.join(self.args)
152 152
153 153 @property
154 154 def running(self):
155 155 """Am I running."""
156 156 if self.state == 'running':
157 157 return True
158 158 else:
159 159 return False
160 160
161 161 def start(self):
162 162 """Start the process."""
163 163 raise NotImplementedError('start must be implemented in a subclass')
164 164
165 165 def stop(self):
166 166 """Stop the process and notify observers of stopping.
167 167
168 168 This method will return None immediately.
169 169 To observe the actual process stopping, see :meth:`on_stop`.
170 170 """
171 171 raise NotImplementedError('stop must be implemented in a subclass')
172 172
173 173 def on_stop(self, f):
174 174 """Register a callback to be called with this Launcher's stop_data
175 175 when the process actually finishes.
176 176 """
177 177 if self.state=='after':
178 178 return f(self.stop_data)
179 179 else:
180 180 self.stop_callbacks.append(f)
181 181
182 182 def notify_start(self, data):
183 183 """Call this to trigger startup actions.
184 184
185 185 This logs the process startup and sets the state to 'running'. It is
186 186 a pass-through so it can be used as a callback.
187 187 """
188 188
189 189 self.log.info('Process %r started: %r' % (self.args[0], data))
190 190 self.start_data = data
191 191 self.state = 'running'
192 192 return data
193 193
194 194 def notify_stop(self, data):
195 195 """Call this to trigger process stop actions.
196 196
197 197 This logs the process stopping and sets the state to 'after'. Call
198 198 this to trigger callbacks registered via :meth:`on_stop`."""
199 199
200 200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
201 201 self.stop_data = data
202 202 self.state = 'after'
203 203 for i in range(len(self.stop_callbacks)):
204 204 d = self.stop_callbacks.pop()
205 205 d(data)
206 206 return data
207 207
208 208 def signal(self, sig):
209 209 """Signal the process.
210 210
211 211 Parameters
212 212 ----------
213 213 sig : str or int
214 214 'KILL', 'INT', etc., or any signal number
215 215 """
216 216 raise NotImplementedError('signal must be implemented in a subclass')
217 217
218 218 class ClusterAppMixin(HasTraits):
219 219 """MixIn for cluster args as traits"""
220 220 cluster_args = List([])
221 221 profile_dir=Unicode('')
222 222 cluster_id=Unicode('')
223 223 def _profile_dir_changed(self, name, old, new):
224 224 self.cluster_args = []
225 225 if self.profile_dir:
226 226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 227 if self.cluster_id:
228 228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 229 _cluster_id_changed = _profile_dir_changed
230 230
231 231 class ControllerMixin(ClusterAppMixin):
232 232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 233 help="""Popen command to launch ipcontroller.""")
234 234 # Command line arguments to ipcontroller.
235 235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 236 help="""command-line args to pass to ipcontroller""")
237 237
238 238 class EngineMixin(ClusterAppMixin):
239 239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 240 help="""command to launch the Engine.""")
241 241 # Command line arguments for ipengine.
242 242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 243 help="command-line arguments to pass to ipengine"
244 244 )
245 245
246 246 #-----------------------------------------------------------------------------
247 247 # Local process launchers
248 248 #-----------------------------------------------------------------------------
249 249
250 250
251 251 class LocalProcessLauncher(BaseLauncher):
252 252 """Start and stop an external process in an asynchronous manner.
253 253
254 254 This will launch the external process with a working directory of
255 255 ``self.work_dir``.
256 256 """
257 257
258 258 # This is used to to construct self.args, which is passed to
259 259 # spawnProcess.
260 260 cmd_and_args = List([])
261 261 poll_frequency = Integer(100) # in ms
262 262
263 263 def __init__(self, work_dir=u'.', config=None, **kwargs):
264 264 super(LocalProcessLauncher, self).__init__(
265 265 work_dir=work_dir, config=config, **kwargs
266 266 )
267 267 self.process = None
268 268 self.poller = None
269 269
270 270 def find_args(self):
271 271 return self.cmd_and_args
272 272
273 273 def start(self):
274 274 if self.state == 'before':
275 275 self.process = Popen(self.args,
276 276 stdout=PIPE,stderr=PIPE,stdin=PIPE,
277 277 env=os.environ,
278 278 cwd=self.work_dir
279 279 )
280 280 if WINDOWS:
281 281 self.stdout = forward_read_events(self.process.stdout)
282 282 self.stderr = forward_read_events(self.process.stderr)
283 283 else:
284 284 self.stdout = self.process.stdout.fileno()
285 285 self.stderr = self.process.stderr.fileno()
286 286 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
287 287 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
288 288 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
289 289 self.poller.start()
290 290 self.notify_start(self.process.pid)
291 291 else:
292 292 s = 'The process was already started and has state: %r' % self.state
293 293 raise ProcessStateError(s)
294 294
295 295 def stop(self):
296 296 return self.interrupt_then_kill()
297 297
298 298 def signal(self, sig):
299 299 if self.state == 'running':
300 300 if WINDOWS and sig != SIGINT:
301 301 # use Windows tree-kill for better child cleanup
302 302 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
303 303 else:
304 304 self.process.send_signal(sig)
305 305
306 306 def interrupt_then_kill(self, delay=2.0):
307 307 """Send INT, wait a delay and then send KILL."""
308 308 try:
309 309 self.signal(SIGINT)
310 310 except Exception:
311 311 self.log.debug("interrupt failed")
312 312 pass
313 313 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
314 314 self.killer.start()
315 315
316 316 # callbacks, etc:
317 317
318 318 def handle_stdout(self, fd, events):
319 319 if WINDOWS:
320 320 line = self.stdout.recv()
321 321 else:
322 322 line = self.process.stdout.readline()
323 323 # a stopped process will be readable but return empty strings
324 324 if line:
325 325 self.log.info(line[:-1])
326 326 else:
327 327 self.poll()
328 328
329 329 def handle_stderr(self, fd, events):
330 330 if WINDOWS:
331 331 line = self.stderr.recv()
332 332 else:
333 333 line = self.process.stderr.readline()
334 334 # a stopped process will be readable but return empty strings
335 335 if line:
336 336 self.log.error(line[:-1])
337 337 else:
338 338 self.poll()
339 339
340 340 def poll(self):
341 341 status = self.process.poll()
342 342 if status is not None:
343 343 self.poller.stop()
344 344 self.loop.remove_handler(self.stdout)
345 345 self.loop.remove_handler(self.stderr)
346 346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
347 347 return status
348 348
349 349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
350 350 """Launch a controller as a regular external process."""
351 351
352 352 def find_args(self):
353 353 return self.controller_cmd + self.cluster_args + self.controller_args
354 354
355 355 def start(self):
356 356 """Start the controller by profile_dir."""
357 357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
358 358 return super(LocalControllerLauncher, self).start()
359 359
360 360
361 361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
362 362 """Launch a single engine as a regular externall process."""
363 363
364 364 def find_args(self):
365 365 return self.engine_cmd + self.cluster_args + self.engine_args
366 366
367 367
368 368 class LocalEngineSetLauncher(LocalEngineLauncher):
369 369 """Launch a set of engines as regular external processes."""
370 370
371 371 delay = CFloat(0.1, config=True,
372 372 help="""delay (in seconds) between starting each engine after the first.
373 373 This can help force the engines to get their ids in order, or limit
374 374 process flood when starting many engines."""
375 375 )
376 376
377 377 # launcher class
378 378 launcher_class = LocalEngineLauncher
379 379
380 380 launchers = Dict()
381 381 stop_data = Dict()
382 382
383 383 def __init__(self, work_dir=u'.', config=None, **kwargs):
384 384 super(LocalEngineSetLauncher, self).__init__(
385 385 work_dir=work_dir, config=config, **kwargs
386 386 )
387 387 self.stop_data = {}
388 388
389 389 def start(self, n):
390 390 """Start n engines by profile or profile_dir."""
391 391 dlist = []
392 392 for i in range(n):
393 393 if i > 0:
394 394 time.sleep(self.delay)
395 395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 397 )
398 398
399 399 # Copy the engine args over to each engine launcher.
400 400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
401 401 el.engine_args = copy.deepcopy(self.engine_args)
402 402 el.on_stop(self._notice_engine_stopped)
403 403 d = el.start()
404 404 if i==0:
405 405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
406 406 self.launchers[i] = el
407 407 dlist.append(d)
408 408 self.notify_start(dlist)
409 409 return dlist
410 410
411 411 def find_args(self):
412 412 return ['engine set']
413 413
414 414 def signal(self, sig):
415 415 dlist = []
416 416 for el in self.launchers.itervalues():
417 417 d = el.signal(sig)
418 418 dlist.append(d)
419 419 return dlist
420 420
421 421 def interrupt_then_kill(self, delay=1.0):
422 422 dlist = []
423 423 for el in self.launchers.itervalues():
424 424 d = el.interrupt_then_kill(delay)
425 425 dlist.append(d)
426 426 return dlist
427 427
428 428 def stop(self):
429 429 return self.interrupt_then_kill()
430 430
431 431 def _notice_engine_stopped(self, data):
432 432 pid = data['pid']
433 433 for idx,el in self.launchers.iteritems():
434 434 if el.process.pid == pid:
435 435 break
436 436 self.launchers.pop(idx)
437 437 self.stop_data[idx] = data
438 438 if not self.launchers:
439 439 self.notify_stop(self.stop_data)
440 440
441 441
442 442 #-----------------------------------------------------------------------------
443 443 # MPIExec launchers
444 444 #-----------------------------------------------------------------------------
445 445
446 446
447 447 class MPIExecLauncher(LocalProcessLauncher):
448 448 """Launch an external process using mpiexec."""
449 449
450 450 mpi_cmd = List(['mpiexec'], config=True,
451 451 help="The mpiexec command to use in starting the process."
452 452 )
453 453 mpi_args = List([], config=True,
454 454 help="The command line arguments to pass to mpiexec."
455 455 )
456 456 program = List(['date'],
457 457 help="The program to start via mpiexec.")
458 458 program_args = List([],
459 459 help="The command line argument to the program."
460 460 )
461 461 n = Integer(1)
462 462
463 463 def find_args(self):
464 464 """Build self.args using all the fields."""
465 465 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
466 466 self.program + self.program_args
467 467
468 468 def start(self, n):
469 469 """Start n instances of the program using mpiexec."""
470 470 self.n = n
471 471 return super(MPIExecLauncher, self).start()
472 472
473 473
474 474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
475 475 """Launch a controller using mpiexec."""
476 476
477 477 # alias back to *non-configurable* program[_args] for use in find_args()
478 478 # this way all Controller/EngineSetLaunchers have the same form, rather
479 479 # than *some* having `program_args` and others `controller_args`
480 480 @property
481 481 def program(self):
482 482 return self.controller_cmd
483 483
484 484 @property
485 485 def program_args(self):
486 486 return self.cluster_args + self.controller_args
487 487
488 488 def start(self):
489 489 """Start the controller by profile_dir."""
490 490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
491 491 return super(MPIExecControllerLauncher, self).start(1)
492 492
493 493
494 494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 495 """Launch engines using mpiexec"""
496 496
497 497 # alias back to *non-configurable* program[_args] for use in find_args()
498 498 # this way all Controller/EngineSetLaunchers have the same form, rather
499 499 # than *some* having `program_args` and others `controller_args`
500 500 @property
501 501 def program(self):
502 502 return self.engine_cmd
503 503
504 504 @property
505 505 def program_args(self):
506 506 return self.cluster_args + self.engine_args
507 507
508 508 def start(self, n):
509 509 """Start n engines by profile or profile_dir."""
510 510 self.n = n
511 511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 512 return super(MPIExecEngineSetLauncher, self).start(n)
513 513
514 514 #-----------------------------------------------------------------------------
515 515 # SSH launchers
516 516 #-----------------------------------------------------------------------------
517 517
518 518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
519 519
520 520 class SSHLauncher(LocalProcessLauncher):
521 521 """A minimal launcher for ssh.
522 522
523 523 To be useful this will probably have to be extended to use the ``sshx``
524 524 idea for environment variables. There could be other things this needs
525 525 as well.
526 526 """
527 527
528 528 ssh_cmd = List(['ssh'], config=True,
529 529 help="command for starting ssh")
530 530 ssh_args = List(['-tt'], config=True,
531 531 help="args to pass to ssh")
532 532 program = List(['date'],
533 533 help="Program to launch via ssh")
534 534 program_args = List([],
535 535 help="args to pass to remote program")
536 536 hostname = Unicode('', config=True,
537 537 help="hostname on which to launch the program")
538 538 user = Unicode('', config=True,
539 539 help="username for ssh")
540 540 location = Unicode('', config=True,
541 541 help="user@hostname location for ssh in one setting")
542 542
543 543 def _hostname_changed(self, name, old, new):
544 544 if self.user:
545 545 self.location = u'%s@%s' % (self.user, new)
546 546 else:
547 547 self.location = new
548 548
549 549 def _user_changed(self, name, old, new):
550 550 self.location = u'%s@%s' % (new, self.hostname)
551 551
552 552 def find_args(self):
553 553 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 554 self.program + self.program_args
555 555
556 556 def start(self, hostname=None, user=None):
557 557 if hostname is not None:
558 558 self.hostname = hostname
559 559 if user is not None:
560 560 self.user = user
561 561
562 562 return super(SSHLauncher, self).start()
563 563
564 564 def signal(self, sig):
565 565 if self.state == 'running':
566 566 # send escaped ssh connection-closer
567 567 self.process.stdin.write('~.')
568 568 self.process.stdin.flush()
569 569
570 570
571 571
572 572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
573 573
574 574 # alias back to *non-configurable* program[_args] for use in find_args()
575 575 # this way all Controller/EngineSetLaunchers have the same form, rather
576 576 # than *some* having `program_args` and others `controller_args`
577 577 @property
578 578 def program(self):
579 579 return self.controller_cmd
580 580
581 581 @property
582 582 def program_args(self):
583 583 return self.cluster_args + self.controller_args
584 584
585 585
586 586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
587 587
588 588 # alias back to *non-configurable* program[_args] for use in find_args()
589 589 # this way all Controller/EngineSetLaunchers have the same form, rather
590 590 # than *some* having `program_args` and others `controller_args`
591 591 @property
592 592 def program(self):
593 593 return self.engine_cmd
594 594
595 595 @property
596 596 def program_args(self):
597 597 return self.cluster_args + self.engine_args
598 598
599 599
600 600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
601 601 launcher_class = SSHEngineLauncher
602 602 engines = Dict(config=True,
603 603 help="""dict of engines to launch. This is a dict by hostname of ints,
604 604 corresponding to the number of engines to start on that host.""")
605 605
606 606 def start(self, n):
607 607 """Start engines by profile or profile_dir.
608 608 `n` is ignored, and the `engines` config property is used instead.
609 609 """
610 610
611 611 dlist = []
612 612 for host, n in self.engines.iteritems():
613 613 if isinstance(n, (tuple, list)):
614 614 n, args = n
615 615 else:
616 616 args = copy.deepcopy(self.engine_args)
617 617
618 618 if '@' in host:
619 619 user,host = host.split('@',1)
620 620 else:
621 621 user=None
622 622 for i in range(n):
623 623 if i > 0:
624 624 time.sleep(self.delay)
625 625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 627 )
628 628
629 629 # Copy the engine args over to each engine launcher.
630 630 el.engine_cmd = self.engine_cmd
631 631 el.engine_args = args
632 632 el.on_stop(self._notice_engine_stopped)
633 633 d = el.start(user=user, hostname=host)
634 634 if i==0:
635 635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
636 636 self.launchers[ "%s/%i" % (host,i) ] = el
637 637 dlist.append(d)
638 638 self.notify_start(dlist)
639 639 return dlist
640 640
641 641
642 642
643 643 #-----------------------------------------------------------------------------
644 644 # Windows HPC Server 2008 scheduler launchers
645 645 #-----------------------------------------------------------------------------
646 646
647 647
648 648 # This is only used on Windows.
649 649 def find_job_cmd():
650 650 if WINDOWS:
651 651 try:
652 652 return find_cmd('job')
653 653 except (FindCmdError, ImportError):
654 654 # ImportError will be raised if win32api is not installed
655 655 return 'job'
656 656 else:
657 657 return 'job'
658 658
659 659
660 660 class WindowsHPCLauncher(BaseLauncher):
661 661
662 662 job_id_regexp = Unicode(r'\d+', config=True,
663 663 help="""A regular expression used to get the job id from the output of the
664 664 submit_command. """
665 665 )
666 666 job_file_name = Unicode(u'ipython_job.xml', config=True,
667 667 help="The filename of the instantiated job script.")
668 668 # The full path to the instantiated job script. This gets made dynamically
669 669 # by combining the work_dir with the job_file_name.
670 670 job_file = Unicode(u'')
671 671 scheduler = Unicode('', config=True,
672 672 help="The hostname of the scheduler to submit the job to.")
673 673 job_cmd = Unicode(find_job_cmd(), config=True,
674 674 help="The command for submitting jobs.")
675 675
676 676 def __init__(self, work_dir=u'.', config=None, **kwargs):
677 677 super(WindowsHPCLauncher, self).__init__(
678 678 work_dir=work_dir, config=config, **kwargs
679 679 )
680 680
681 681 @property
682 682 def job_file(self):
683 683 return os.path.join(self.work_dir, self.job_file_name)
684 684
685 685 def write_job_file(self, n):
686 686 raise NotImplementedError("Implement write_job_file in a subclass.")
687 687
688 688 def find_args(self):
689 689 return [u'job.exe']
690 690
691 691 def parse_job_id(self, output):
692 692 """Take the output of the submit command and return the job id."""
693 693 m = re.search(self.job_id_regexp, output)
694 694 if m is not None:
695 695 job_id = m.group()
696 696 else:
697 697 raise LauncherError("Job id couldn't be determined: %s" % output)
698 698 self.job_id = job_id
699 699 self.log.info('Job started with job id: %r' % job_id)
700 700 return job_id
701 701
702 702 def start(self, n):
703 703 """Start n copies of the process using the Win HPC job scheduler."""
704 704 self.write_job_file(n)
705 705 args = [
706 706 'submit',
707 707 '/jobfile:%s' % self.job_file,
708 708 '/scheduler:%s' % self.scheduler
709 709 ]
710 710 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
711 711
712 712 output = check_output([self.job_cmd]+args,
713 713 env=os.environ,
714 714 cwd=self.work_dir,
715 715 stderr=STDOUT
716 716 )
717 717 job_id = self.parse_job_id(output)
718 718 self.notify_start(job_id)
719 719 return job_id
720 720
721 721 def stop(self):
722 722 args = [
723 723 'cancel',
724 724 self.job_id,
725 725 '/scheduler:%s' % self.scheduler
726 726 ]
727 727 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
728 728 try:
729 729 output = check_output([self.job_cmd]+args,
730 730 env=os.environ,
731 731 cwd=self.work_dir,
732 732 stderr=STDOUT
733 733 )
734 734 except:
735 735 output = 'The job already appears to be stoppped: %r' % self.job_id
736 736 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
737 737 return output
738 738
739 739
740 740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
741 741
742 742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
743 743 help="WinHPC xml job file.")
744 744 controller_args = List([], config=False,
745 745 help="extra args to pass to ipcontroller")
746 746
747 747 def write_job_file(self, n):
748 748 job = IPControllerJob(config=self.config)
749 749
750 750 t = IPControllerTask(config=self.config)
751 751 # The tasks work directory is *not* the actual work directory of
752 752 # the controller. It is used as the base path for the stdout/stderr
753 753 # files that the scheduler redirects to.
754 754 t.work_directory = self.profile_dir
755 755 # Add the profile_dir and from self.start().
756 756 t.controller_args.extend(self.cluster_args)
757 757 t.controller_args.extend(self.controller_args)
758 758 job.add_task(t)
759 759
760 760 self.log.info("Writing job description file: %s" % self.job_file)
761 761 job.write(self.job_file)
762 762
763 763 @property
764 764 def job_file(self):
765 765 return os.path.join(self.profile_dir, self.job_file_name)
766 766
767 767 def start(self):
768 768 """Start the controller by profile_dir."""
769 769 return super(WindowsHPCControllerLauncher, self).start(1)
770 770
771 771
772 772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
773 773
774 774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
775 775 help="jobfile for ipengines job")
776 776 engine_args = List([], config=False,
777 777 help="extra args to pas to ipengine")
778 778
779 779 def write_job_file(self, n):
780 780 job = IPEngineSetJob(config=self.config)
781 781
782 782 for i in range(n):
783 783 t = IPEngineTask(config=self.config)
784 784 # The tasks work directory is *not* the actual work directory of
785 785 # the engine. It is used as the base path for the stdout/stderr
786 786 # files that the scheduler redirects to.
787 787 t.work_directory = self.profile_dir
788 788 # Add the profile_dir and from self.start().
789 t.controller_args.extend(self.cluster_args)
790 t.controller_args.extend(self.engine_args)
789 t.engine_args.extend(self.cluster_args)
790 t.engine_args.extend(self.engine_args)
791 791 job.add_task(t)
792 792
793 793 self.log.info("Writing job description file: %s" % self.job_file)
794 794 job.write(self.job_file)
795 795
796 796 @property
797 797 def job_file(self):
798 798 return os.path.join(self.profile_dir, self.job_file_name)
799 799
800 800 def start(self, n):
801 801 """Start the controller by profile_dir."""
802 802 return super(WindowsHPCEngineSetLauncher, self).start(n)
803 803
804 804
805 805 #-----------------------------------------------------------------------------
806 806 # Batch (PBS) system launchers
807 807 #-----------------------------------------------------------------------------
808 808
809 809 class BatchClusterAppMixin(ClusterAppMixin):
810 810 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
811 811 def _profile_dir_changed(self, name, old, new):
812 812 self.context[name] = new
813 813 _cluster_id_changed = _profile_dir_changed
814 814
815 815 def _profile_dir_default(self):
816 816 self.context['profile_dir'] = ''
817 817 return ''
818 818 def _cluster_id_default(self):
819 819 self.context['cluster_id'] = ''
820 820 return ''
821 821
822 822
823 823 class BatchSystemLauncher(BaseLauncher):
824 824 """Launch an external process using a batch system.
825 825
826 826 This class is designed to work with UNIX batch systems like PBS, LSF,
827 827 GridEngine, etc. The overall model is that there are different commands
828 828 like qsub, qdel, etc. that handle the starting and stopping of the process.
829 829
830 830 This class also has the notion of a batch script. The ``batch_template``
831 831 attribute can be set to a string that is a template for the batch script.
832 832 This template is instantiated using string formatting. Thus the template can
833 833 use {n} fot the number of instances. Subclasses can add additional variables
834 834 to the template dict.
835 835 """
836 836
837 837 # Subclasses must fill these in. See PBSEngineSet
838 838 submit_command = List([''], config=True,
839 839 help="The name of the command line program used to submit jobs.")
840 840 delete_command = List([''], config=True,
841 841 help="The name of the command line program used to delete jobs.")
842 842 job_id_regexp = Unicode('', config=True,
843 843 help="""A regular expression used to get the job id from the output of the
844 844 submit_command.""")
845 845 batch_template = Unicode('', config=True,
846 846 help="The string that is the batch script template itself.")
847 847 batch_template_file = Unicode(u'', config=True,
848 848 help="The file that contains the batch template.")
849 849 batch_file_name = Unicode(u'batch_script', config=True,
850 850 help="The filename of the instantiated batch script.")
851 851 queue = Unicode(u'', config=True,
852 852 help="The PBS Queue.")
853 853
854 854 def _queue_changed(self, name, old, new):
855 855 self.context[name] = new
856 856
857 857 n = Integer(1)
858 858 _n_changed = _queue_changed
859 859
860 860 # not configurable, override in subclasses
861 861 # PBS Job Array regex
862 862 job_array_regexp = Unicode('')
863 863 job_array_template = Unicode('')
864 864 # PBS Queue regex
865 865 queue_regexp = Unicode('')
866 866 queue_template = Unicode('')
867 867 # The default batch template, override in subclasses
868 868 default_template = Unicode('')
869 869 # The full path to the instantiated batch script.
870 870 batch_file = Unicode(u'')
871 871 # the format dict used with batch_template:
872 872 context = Dict()
873 873 def _context_default(self):
874 874 """load the default context with the default values for the basic keys
875 875
876 876 because the _trait_changed methods only load the context if they
877 877 are set to something other than the default value.
878 878 """
879 879 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
880 880
881 881 # the Formatter instance for rendering the templates:
882 882 formatter = Instance(EvalFormatter, (), {})
883 883
884 884
885 885 def find_args(self):
886 886 return self.submit_command + [self.batch_file]
887 887
888 888 def __init__(self, work_dir=u'.', config=None, **kwargs):
889 889 super(BatchSystemLauncher, self).__init__(
890 890 work_dir=work_dir, config=config, **kwargs
891 891 )
892 892 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
893 893
894 894 def parse_job_id(self, output):
895 895 """Take the output of the submit command and return the job id."""
896 896 m = re.search(self.job_id_regexp, output)
897 897 if m is not None:
898 898 job_id = m.group()
899 899 else:
900 900 raise LauncherError("Job id couldn't be determined: %s" % output)
901 901 self.job_id = job_id
902 902 self.log.info('Job submitted with job id: %r' % job_id)
903 903 return job_id
904 904
905 905 def write_batch_script(self, n):
906 906 """Instantiate and write the batch script to the work_dir."""
907 907 self.n = n
908 908 # first priority is batch_template if set
909 909 if self.batch_template_file and not self.batch_template:
910 910 # second priority is batch_template_file
911 911 with open(self.batch_template_file) as f:
912 912 self.batch_template = f.read()
913 913 if not self.batch_template:
914 914 # third (last) priority is default_template
915 915 self.batch_template = self.default_template
916 916
917 917 # add jobarray or queue lines to user-specified template
918 918 # note that this is *only* when user did not specify a template.
919 919 regex = re.compile(self.job_array_regexp)
920 920 # print regex.search(self.batch_template)
921 921 if not regex.search(self.batch_template):
922 922 self.log.info("adding job array settings to batch script")
923 923 firstline, rest = self.batch_template.split('\n',1)
924 924 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
925 925
926 926 regex = re.compile(self.queue_regexp)
927 927 # print regex.search(self.batch_template)
928 928 if self.queue and not regex.search(self.batch_template):
929 929 self.log.info("adding PBS queue settings to batch script")
930 930 firstline, rest = self.batch_template.split('\n',1)
931 931 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
932 932
933 933 script_as_string = self.formatter.format(self.batch_template, **self.context)
934 934 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
935 935
936 936 with open(self.batch_file, 'w') as f:
937 937 f.write(script_as_string)
938 938 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
939 939
940 940 def start(self, n):
941 941 """Start n copies of the process using a batch system."""
942 942 # Here we save profile_dir in the context so they
943 943 # can be used in the batch script template as {profile_dir}
944 944 self.write_batch_script(n)
945 945 output = check_output(self.args, env=os.environ)
946 946
947 947 job_id = self.parse_job_id(output)
948 948 self.notify_start(job_id)
949 949 return job_id
950 950
951 951 def stop(self):
952 952 output = check_output(self.delete_command+[self.job_id], env=os.environ)
953 953 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
954 954 return output
955 955
956 956
957 957 class PBSLauncher(BatchSystemLauncher):
958 958 """A BatchSystemLauncher subclass for PBS."""
959 959
960 960 submit_command = List(['qsub'], config=True,
961 961 help="The PBS submit command ['qsub']")
962 962 delete_command = List(['qdel'], config=True,
963 963 help="The PBS delete command ['qsub']")
964 964 job_id_regexp = Unicode(r'\d+', config=True,
965 965 help="Regular expresion for identifying the job ID [r'\d+']")
966 966
967 967 batch_file = Unicode(u'')
968 968 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
969 969 job_array_template = Unicode('#PBS -t 1-{n}')
970 970 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
971 971 queue_template = Unicode('#PBS -q {queue}')
972 972
973 973
974 974 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
975 975 """Launch a controller using PBS."""
976 976
977 977 batch_file_name = Unicode(u'pbs_controller', config=True,
978 978 help="batch file name for the controller job.")
979 979 default_template= Unicode("""#!/bin/sh
980 980 #PBS -V
981 981 #PBS -N ipcontroller
982 982 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
983 983 """%(' '.join(ipcontroller_cmd_argv)))
984 984
985 985
986 986 def start(self):
987 987 """Start the controller by profile or profile_dir."""
988 988 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
989 989 return super(PBSControllerLauncher, self).start(1)
990 990
991 991
992 992 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
993 993 """Launch Engines using PBS"""
994 994 batch_file_name = Unicode(u'pbs_engines', config=True,
995 995 help="batch file name for the engine(s) job.")
996 996 default_template= Unicode(u"""#!/bin/sh
997 997 #PBS -V
998 998 #PBS -N ipengine
999 999 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1000 1000 """%(' '.join(ipengine_cmd_argv)))
1001 1001
1002 1002 def start(self, n):
1003 1003 """Start n engines by profile or profile_dir."""
1004 1004 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
1005 1005 return super(PBSEngineSetLauncher, self).start(n)
1006 1006
1007 1007 #SGE is very similar to PBS
1008 1008
1009 1009 class SGELauncher(PBSLauncher):
1010 1010 """Sun GridEngine is a PBS clone with slightly different syntax"""
1011 1011 job_array_regexp = Unicode('#\$\W+\-t')
1012 1012 job_array_template = Unicode('#$ -t 1-{n}')
1013 1013 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1014 1014 queue_template = Unicode('#$ -q {queue}')
1015 1015
1016 1016 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1017 1017 """Launch a controller using SGE."""
1018 1018
1019 1019 batch_file_name = Unicode(u'sge_controller', config=True,
1020 1020 help="batch file name for the ipontroller job.")
1021 1021 default_template= Unicode(u"""#$ -V
1022 1022 #$ -S /bin/sh
1023 1023 #$ -N ipcontroller
1024 1024 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1025 1025 """%(' '.join(ipcontroller_cmd_argv)))
1026 1026
1027 1027 def start(self):
1028 1028 """Start the controller by profile or profile_dir."""
1029 1029 self.log.info("Starting SGEControllerLauncher: %r" % self.args)
1030 1030 return super(SGEControllerLauncher, self).start(1)
1031 1031
1032 1032 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1033 1033 """Launch Engines with SGE"""
1034 1034 batch_file_name = Unicode(u'sge_engines', config=True,
1035 1035 help="batch file name for the engine(s) job.")
1036 1036 default_template = Unicode("""#$ -V
1037 1037 #$ -S /bin/sh
1038 1038 #$ -N ipengine
1039 1039 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1040 1040 """%(' '.join(ipengine_cmd_argv)))
1041 1041
1042 1042 def start(self, n):
1043 1043 """Start n engines by profile or profile_dir."""
1044 1044 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1045 1045 return super(SGEEngineSetLauncher, self).start(n)
1046 1046
1047 1047
1048 1048 # LSF launchers
1049 1049
1050 1050 class LSFLauncher(BatchSystemLauncher):
1051 1051 """A BatchSystemLauncher subclass for LSF."""
1052 1052
1053 1053 submit_command = List(['bsub'], config=True,
1054 1054 help="The PBS submit command ['bsub']")
1055 1055 delete_command = List(['bkill'], config=True,
1056 1056 help="The PBS delete command ['bkill']")
1057 1057 job_id_regexp = Unicode(r'\d+', config=True,
1058 1058 help="Regular expresion for identifying the job ID [r'\d+']")
1059 1059
1060 1060 batch_file = Unicode(u'')
1061 1061 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1062 1062 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1063 1063 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1064 1064 queue_template = Unicode('#BSUB -q {queue}')
1065 1065
1066 1066 def start(self, n):
1067 1067 """Start n copies of the process using LSF batch system.
1068 1068 This cant inherit from the base class because bsub expects
1069 1069 to be piped a shell script in order to honor the #BSUB directives :
1070 1070 bsub < script
1071 1071 """
1072 1072 # Here we save profile_dir in the context so they
1073 1073 # can be used in the batch script template as {profile_dir}
1074 1074 self.write_batch_script(n)
1075 1075 #output = check_output(self.args, env=os.environ)
1076 1076 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1077 1077 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1078 1078 output,err = p.communicate()
1079 1079 job_id = self.parse_job_id(output)
1080 1080 self.notify_start(job_id)
1081 1081 return job_id
1082 1082
1083 1083
1084 1084 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1085 1085 """Launch a controller using LSF."""
1086 1086
1087 1087 batch_file_name = Unicode(u'lsf_controller', config=True,
1088 1088 help="batch file name for the controller job.")
1089 1089 default_template= Unicode("""#!/bin/sh
1090 1090 #BSUB -J ipcontroller
1091 1091 #BSUB -oo ipcontroller.o.%%J
1092 1092 #BSUB -eo ipcontroller.e.%%J
1093 1093 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1094 1094 """%(' '.join(ipcontroller_cmd_argv)))
1095 1095
1096 1096 def start(self):
1097 1097 """Start the controller by profile or profile_dir."""
1098 1098 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1099 1099 return super(LSFControllerLauncher, self).start(1)
1100 1100
1101 1101
1102 1102 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1103 1103 """Launch Engines using LSF"""
1104 1104 batch_file_name = Unicode(u'lsf_engines', config=True,
1105 1105 help="batch file name for the engine(s) job.")
1106 1106 default_template= Unicode(u"""#!/bin/sh
1107 1107 #BSUB -oo ipengine.o.%%J
1108 1108 #BSUB -eo ipengine.e.%%J
1109 1109 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1110 1110 """%(' '.join(ipengine_cmd_argv)))
1111 1111
1112 1112 def start(self, n):
1113 1113 """Start n engines by profile or profile_dir."""
1114 1114 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1115 1115 return super(LSFEngineSetLauncher, self).start(n)
1116 1116
1117 1117
1118 1118 #-----------------------------------------------------------------------------
1119 1119 # A launcher for ipcluster itself!
1120 1120 #-----------------------------------------------------------------------------
1121 1121
1122 1122
1123 1123 class IPClusterLauncher(LocalProcessLauncher):
1124 1124 """Launch the ipcluster program in an external process."""
1125 1125
1126 1126 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1127 1127 help="Popen command for ipcluster")
1128 1128 ipcluster_args = List(
1129 1129 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1130 1130 help="Command line arguments to pass to ipcluster.")
1131 1131 ipcluster_subcommand = Unicode('start')
1132 1132 ipcluster_n = Integer(2)
1133 1133
1134 1134 def find_args(self):
1135 1135 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1136 1136 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1137 1137
1138 1138 def start(self):
1139 1139 self.log.info("Starting ipcluster: %r" % self.args)
1140 1140 return super(IPClusterLauncher, self).start()
1141 1141
1142 1142 #-----------------------------------------------------------------------------
1143 1143 # Collections of launchers
1144 1144 #-----------------------------------------------------------------------------
1145 1145
1146 1146 local_launchers = [
1147 1147 LocalControllerLauncher,
1148 1148 LocalEngineLauncher,
1149 1149 LocalEngineSetLauncher,
1150 1150 ]
1151 1151 mpi_launchers = [
1152 1152 MPIExecLauncher,
1153 1153 MPIExecControllerLauncher,
1154 1154 MPIExecEngineSetLauncher,
1155 1155 ]
1156 1156 ssh_launchers = [
1157 1157 SSHLauncher,
1158 1158 SSHControllerLauncher,
1159 1159 SSHEngineLauncher,
1160 1160 SSHEngineSetLauncher,
1161 1161 ]
1162 1162 winhpc_launchers = [
1163 1163 WindowsHPCLauncher,
1164 1164 WindowsHPCControllerLauncher,
1165 1165 WindowsHPCEngineSetLauncher,
1166 1166 ]
1167 1167 pbs_launchers = [
1168 1168 PBSLauncher,
1169 1169 PBSControllerLauncher,
1170 1170 PBSEngineSetLauncher,
1171 1171 ]
1172 1172 sge_launchers = [
1173 1173 SGELauncher,
1174 1174 SGEControllerLauncher,
1175 1175 SGEEngineSetLauncher,
1176 1176 ]
1177 1177 lsf_launchers = [
1178 1178 LSFLauncher,
1179 1179 LSFControllerLauncher,
1180 1180 LSFEngineSetLauncher,
1181 1181 ]
1182 1182 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1183 1183 + pbs_launchers + sge_launchers + lsf_launchers
1184 1184
General Comments 0
You need to be logged in to leave comments. Login now