##// END OF EJS Templates
Tided up queue configuration(no support in Condor)
James Booth -
Show More
@@ -1,1463 +1,1462 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import pipes
26 26 import stat
27 27 import sys
28 28 import time
29 29
30 30 # signal imports, handling various platforms, versions
31 31
32 32 from signal import SIGINT, SIGTERM
33 33 try:
34 34 from signal import SIGKILL
35 35 except ImportError:
36 36 # Windows
37 37 SIGKILL=SIGTERM
38 38
39 39 try:
40 40 # Windows >= 2.7, 3.2
41 41 from signal import CTRL_C_EVENT as SIGINT
42 42 except ImportError:
43 43 pass
44 44
45 45 from subprocess import Popen, PIPE, STDOUT
46 46 try:
47 47 from subprocess import check_output
48 48 except ImportError:
49 49 # pre-2.7, define check_output with Popen
50 50 def check_output(*args, **kwargs):
51 51 kwargs.update(dict(stdout=PIPE))
52 52 p = Popen(*args, **kwargs)
53 53 out,err = p.communicate()
54 54 return out
55 55
56 56 from zmq.eventloop import ioloop
57 57
58 58 from IPython.config.application import Application
59 59 from IPython.config.configurable import LoggingConfigurable
60 60 from IPython.utils.text import EvalFormatter
61 61 from IPython.utils.traitlets import (
62 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 63 )
64 64 from IPython.utils.encoding import DEFAULT_ENCODING
65 65 from IPython.utils.path import get_home_dir
66 66 from IPython.utils.process import find_cmd, FindCmdError
67 67
68 68 from .win32support import forward_read_events
69 69
70 70 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71 71
72 72 WINDOWS = os.name == 'nt'
73 73
74 74 #-----------------------------------------------------------------------------
75 75 # Paths to the kernel apps
76 76 #-----------------------------------------------------------------------------
77 77
78 78 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
79 79
80 80 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
81 81
82 82 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
83 83
84 84 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
85 85
86 86 # HTCondor frustratingly destroys sys.executable when launching remote processes
87 87 # thus Python will default back to the system module search paths which is
88 88 # ridiculously fragile (not to mention destructive for virutalenvs).
89 89 # however, if we use the ip{cluster, engine, controller} scripts as our
90 90 # executable we circumvent this - the mechanism of shebanged scripts means that
91 91 # the python binary will be launched with argv[0] set correctly.
92 92 # This does mean that for HTCondor we require that:
93 93 # a. The python interpreter you are using is in a folder next to the ipengine,
94 94 # ipcluster and ipcontroller scripts
95 95 # b. I have no idea what the consequences are for Windows.
96 96 bin_dir = os.path.dirname(sys.executable)
97 97
98 98 condor_ipcluster_cmd_argv = os.path.join(bin_dir, 'ipcluster')
99 99
100 100 condor_ipengine_cmd_argv = os.path.join(bin_dir, 'ipengine')
101 101
102 102 condor_ipcontroller_cmd_argv = os.path.join(bin_dir, 'ipcontroller')
103 103
104 104 #-----------------------------------------------------------------------------
105 105 # Base launchers and errors
106 106 #-----------------------------------------------------------------------------
107 107
108 108 class LauncherError(Exception):
109 109 pass
110 110
111 111
112 112 class ProcessStateError(LauncherError):
113 113 pass
114 114
115 115
116 116 class UnknownStatus(LauncherError):
117 117 pass
118 118
119 119
120 120 class BaseLauncher(LoggingConfigurable):
121 121 """An asbtraction for starting, stopping and signaling a process."""
122 122
123 123 # In all of the launchers, the work_dir is where child processes will be
124 124 # run. This will usually be the profile_dir, but may not be. any work_dir
125 125 # passed into the __init__ method will override the config value.
126 126 # This should not be used to set the work_dir for the actual engine
127 127 # and controller. Instead, use their own config files or the
128 128 # controller_args, engine_args attributes of the launchers to add
129 129 # the work_dir option.
130 130 work_dir = Unicode(u'.')
131 131 loop = Instance('zmq.eventloop.ioloop.IOLoop')
132 132
133 133 start_data = Any()
134 134 stop_data = Any()
135 135
136 136 def _loop_default(self):
137 137 return ioloop.IOLoop.instance()
138 138
139 139 def __init__(self, work_dir=u'.', config=None, **kwargs):
140 140 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
141 141 self.state = 'before' # can be before, running, after
142 142 self.stop_callbacks = []
143 143 self.start_data = None
144 144 self.stop_data = None
145 145
146 146 @property
147 147 def args(self):
148 148 """A list of cmd and args that will be used to start the process.
149 149
150 150 This is what is passed to :func:`spawnProcess` and the first element
151 151 will be the process name.
152 152 """
153 153 return self.find_args()
154 154
155 155 def find_args(self):
156 156 """The ``.args`` property calls this to find the args list.
157 157
158 158 Subcommand should implement this to construct the cmd and args.
159 159 """
160 160 raise NotImplementedError('find_args must be implemented in a subclass')
161 161
162 162 @property
163 163 def arg_str(self):
164 164 """The string form of the program arguments."""
165 165 return ' '.join(self.args)
166 166
167 167 @property
168 168 def running(self):
169 169 """Am I running."""
170 170 if self.state == 'running':
171 171 return True
172 172 else:
173 173 return False
174 174
175 175 def start(self):
176 176 """Start the process."""
177 177 raise NotImplementedError('start must be implemented in a subclass')
178 178
179 179 def stop(self):
180 180 """Stop the process and notify observers of stopping.
181 181
182 182 This method will return None immediately.
183 183 To observe the actual process stopping, see :meth:`on_stop`.
184 184 """
185 185 raise NotImplementedError('stop must be implemented in a subclass')
186 186
187 187 def on_stop(self, f):
188 188 """Register a callback to be called with this Launcher's stop_data
189 189 when the process actually finishes.
190 190 """
191 191 if self.state=='after':
192 192 return f(self.stop_data)
193 193 else:
194 194 self.stop_callbacks.append(f)
195 195
196 196 def notify_start(self, data):
197 197 """Call this to trigger startup actions.
198 198
199 199 This logs the process startup and sets the state to 'running'. It is
200 200 a pass-through so it can be used as a callback.
201 201 """
202 202
203 203 self.log.debug('Process %r started: %r', self.args[0], data)
204 204 self.start_data = data
205 205 self.state = 'running'
206 206 return data
207 207
208 208 def notify_stop(self, data):
209 209 """Call this to trigger process stop actions.
210 210
211 211 This logs the process stopping and sets the state to 'after'. Call
212 212 this to trigger callbacks registered via :meth:`on_stop`."""
213 213
214 214 self.log.debug('Process %r stopped: %r', self.args[0], data)
215 215 self.stop_data = data
216 216 self.state = 'after'
217 217 for i in range(len(self.stop_callbacks)):
218 218 d = self.stop_callbacks.pop()
219 219 d(data)
220 220 return data
221 221
222 222 def signal(self, sig):
223 223 """Signal the process.
224 224
225 225 Parameters
226 226 ----------
227 227 sig : str or int
228 228 'KILL', 'INT', etc., or any signal number
229 229 """
230 230 raise NotImplementedError('signal must be implemented in a subclass')
231 231
232 232 class ClusterAppMixin(HasTraits):
233 233 """MixIn for cluster args as traits"""
234 234 profile_dir=Unicode('')
235 235 cluster_id=Unicode('')
236 236
237 237 @property
238 238 def cluster_args(self):
239 239 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
240 240
241 241 class ControllerMixin(ClusterAppMixin):
242 242 controller_cmd = List(ipcontroller_cmd_argv, config=True,
243 243 help="""Popen command to launch ipcontroller.""")
244 244 # Command line arguments to ipcontroller.
245 245 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
246 246 help="""command-line args to pass to ipcontroller""")
247 247
248 248 class EngineMixin(ClusterAppMixin):
249 249 engine_cmd = List(ipengine_cmd_argv, config=True,
250 250 help="""command to launch the Engine.""")
251 251 # Command line arguments for ipengine.
252 252 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
253 253 help="command-line arguments to pass to ipengine"
254 254 )
255 255
256 256
257 257 #-----------------------------------------------------------------------------
258 258 # Local process launchers
259 259 #-----------------------------------------------------------------------------
260 260
261 261
262 262 class LocalProcessLauncher(BaseLauncher):
263 263 """Start and stop an external process in an asynchronous manner.
264 264
265 265 This will launch the external process with a working directory of
266 266 ``self.work_dir``.
267 267 """
268 268
269 269 # This is used to to construct self.args, which is passed to
270 270 # spawnProcess.
271 271 cmd_and_args = List([])
272 272 poll_frequency = Integer(100) # in ms
273 273
274 274 def __init__(self, work_dir=u'.', config=None, **kwargs):
275 275 super(LocalProcessLauncher, self).__init__(
276 276 work_dir=work_dir, config=config, **kwargs
277 277 )
278 278 self.process = None
279 279 self.poller = None
280 280
281 281 def find_args(self):
282 282 return self.cmd_and_args
283 283
284 284 def start(self):
285 285 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
286 286 if self.state == 'before':
287 287 self.process = Popen(self.args,
288 288 stdout=PIPE,stderr=PIPE,stdin=PIPE,
289 289 env=os.environ,
290 290 cwd=self.work_dir
291 291 )
292 292 if WINDOWS:
293 293 self.stdout = forward_read_events(self.process.stdout)
294 294 self.stderr = forward_read_events(self.process.stderr)
295 295 else:
296 296 self.stdout = self.process.stdout.fileno()
297 297 self.stderr = self.process.stderr.fileno()
298 298 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
299 299 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
300 300 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
301 301 self.poller.start()
302 302 self.notify_start(self.process.pid)
303 303 else:
304 304 s = 'The process was already started and has state: %r' % self.state
305 305 raise ProcessStateError(s)
306 306
307 307 def stop(self):
308 308 return self.interrupt_then_kill()
309 309
310 310 def signal(self, sig):
311 311 if self.state == 'running':
312 312 if WINDOWS and sig != SIGINT:
313 313 # use Windows tree-kill for better child cleanup
314 314 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
315 315 else:
316 316 self.process.send_signal(sig)
317 317
318 318 def interrupt_then_kill(self, delay=2.0):
319 319 """Send INT, wait a delay and then send KILL."""
320 320 try:
321 321 self.signal(SIGINT)
322 322 except Exception:
323 323 self.log.debug("interrupt failed")
324 324 pass
325 325 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
326 326 self.killer.start()
327 327
328 328 # callbacks, etc:
329 329
330 330 def handle_stdout(self, fd, events):
331 331 if WINDOWS:
332 332 line = self.stdout.recv()
333 333 else:
334 334 line = self.process.stdout.readline()
335 335 # a stopped process will be readable but return empty strings
336 336 if line:
337 337 self.log.debug(line[:-1])
338 338 else:
339 339 self.poll()
340 340
341 341 def handle_stderr(self, fd, events):
342 342 if WINDOWS:
343 343 line = self.stderr.recv()
344 344 else:
345 345 line = self.process.stderr.readline()
346 346 # a stopped process will be readable but return empty strings
347 347 if line:
348 348 self.log.debug(line[:-1])
349 349 else:
350 350 self.poll()
351 351
352 352 def poll(self):
353 353 status = self.process.poll()
354 354 if status is not None:
355 355 self.poller.stop()
356 356 self.loop.remove_handler(self.stdout)
357 357 self.loop.remove_handler(self.stderr)
358 358 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
359 359 return status
360 360
361 361 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
362 362 """Launch a controller as a regular external process."""
363 363
364 364 def find_args(self):
365 365 return self.controller_cmd + self.cluster_args + self.controller_args
366 366
367 367 def start(self):
368 368 """Start the controller by profile_dir."""
369 369 return super(LocalControllerLauncher, self).start()
370 370
371 371
372 372 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
373 373 """Launch a single engine as a regular externall process."""
374 374
375 375 def find_args(self):
376 376 return self.engine_cmd + self.cluster_args + self.engine_args
377 377
378 378
379 379 class LocalEngineSetLauncher(LocalEngineLauncher):
380 380 """Launch a set of engines as regular external processes."""
381 381
382 382 delay = CFloat(0.1, config=True,
383 383 help="""delay (in seconds) between starting each engine after the first.
384 384 This can help force the engines to get their ids in order, or limit
385 385 process flood when starting many engines."""
386 386 )
387 387
388 388 # launcher class
389 389 launcher_class = LocalEngineLauncher
390 390
391 391 launchers = Dict()
392 392 stop_data = Dict()
393 393
394 394 def __init__(self, work_dir=u'.', config=None, **kwargs):
395 395 super(LocalEngineSetLauncher, self).__init__(
396 396 work_dir=work_dir, config=config, **kwargs
397 397 )
398 398 self.stop_data = {}
399 399
400 400 def start(self, n):
401 401 """Start n engines by profile or profile_dir."""
402 402 dlist = []
403 403 for i in range(n):
404 404 if i > 0:
405 405 time.sleep(self.delay)
406 406 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
407 407 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
408 408 )
409 409
410 410 # Copy the engine args over to each engine launcher.
411 411 el.engine_cmd = copy.deepcopy(self.engine_cmd)
412 412 el.engine_args = copy.deepcopy(self.engine_args)
413 413 el.on_stop(self._notice_engine_stopped)
414 414 d = el.start()
415 415 self.launchers[i] = el
416 416 dlist.append(d)
417 417 self.notify_start(dlist)
418 418 return dlist
419 419
420 420 def find_args(self):
421 421 return ['engine set']
422 422
423 423 def signal(self, sig):
424 424 dlist = []
425 425 for el in self.launchers.itervalues():
426 426 d = el.signal(sig)
427 427 dlist.append(d)
428 428 return dlist
429 429
430 430 def interrupt_then_kill(self, delay=1.0):
431 431 dlist = []
432 432 for el in self.launchers.itervalues():
433 433 d = el.interrupt_then_kill(delay)
434 434 dlist.append(d)
435 435 return dlist
436 436
437 437 def stop(self):
438 438 return self.interrupt_then_kill()
439 439
440 440 def _notice_engine_stopped(self, data):
441 441 pid = data['pid']
442 442 for idx,el in self.launchers.iteritems():
443 443 if el.process.pid == pid:
444 444 break
445 445 self.launchers.pop(idx)
446 446 self.stop_data[idx] = data
447 447 if not self.launchers:
448 448 self.notify_stop(self.stop_data)
449 449
450 450
451 451 #-----------------------------------------------------------------------------
452 452 # MPI launchers
453 453 #-----------------------------------------------------------------------------
454 454
455 455
456 456 class MPILauncher(LocalProcessLauncher):
457 457 """Launch an external process using mpiexec."""
458 458
459 459 mpi_cmd = List(['mpiexec'], config=True,
460 460 help="The mpiexec command to use in starting the process."
461 461 )
462 462 mpi_args = List([], config=True,
463 463 help="The command line arguments to pass to mpiexec."
464 464 )
465 465 program = List(['date'],
466 466 help="The program to start via mpiexec.")
467 467 program_args = List([],
468 468 help="The command line argument to the program."
469 469 )
470 470 n = Integer(1)
471 471
472 472 def __init__(self, *args, **kwargs):
473 473 # deprecation for old MPIExec names:
474 474 config = kwargs.get('config', {})
475 475 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
476 476 deprecated = config.get(oldname)
477 477 if deprecated:
478 478 newname = oldname.replace('MPIExec', 'MPI')
479 479 config[newname].update(deprecated)
480 480 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
481 481
482 482 super(MPILauncher, self).__init__(*args, **kwargs)
483 483
484 484 def find_args(self):
485 485 """Build self.args using all the fields."""
486 486 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
487 487 self.program + self.program_args
488 488
489 489 def start(self, n):
490 490 """Start n instances of the program using mpiexec."""
491 491 self.n = n
492 492 return super(MPILauncher, self).start()
493 493
494 494
495 495 class MPIControllerLauncher(MPILauncher, ControllerMixin):
496 496 """Launch a controller using mpiexec."""
497 497
498 498 # alias back to *non-configurable* program[_args] for use in find_args()
499 499 # this way all Controller/EngineSetLaunchers have the same form, rather
500 500 # than *some* having `program_args` and others `controller_args`
501 501 @property
502 502 def program(self):
503 503 return self.controller_cmd
504 504
505 505 @property
506 506 def program_args(self):
507 507 return self.cluster_args + self.controller_args
508 508
509 509 def start(self):
510 510 """Start the controller by profile_dir."""
511 511 return super(MPIControllerLauncher, self).start(1)
512 512
513 513
514 514 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
515 515 """Launch engines using mpiexec"""
516 516
517 517 # alias back to *non-configurable* program[_args] for use in find_args()
518 518 # this way all Controller/EngineSetLaunchers have the same form, rather
519 519 # than *some* having `program_args` and others `controller_args`
520 520 @property
521 521 def program(self):
522 522 return self.engine_cmd
523 523
524 524 @property
525 525 def program_args(self):
526 526 return self.cluster_args + self.engine_args
527 527
528 528 def start(self, n):
529 529 """Start n engines by profile or profile_dir."""
530 530 self.n = n
531 531 return super(MPIEngineSetLauncher, self).start(n)
532 532
533 533 # deprecated MPIExec names
534 534 class DeprecatedMPILauncher(object):
535 535 def warn(self):
536 536 oldname = self.__class__.__name__
537 537 newname = oldname.replace('MPIExec', 'MPI')
538 538 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
539 539
540 540 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
541 541 """Deprecated, use MPILauncher"""
542 542 def __init__(self, *args, **kwargs):
543 543 super(MPIExecLauncher, self).__init__(*args, **kwargs)
544 544 self.warn()
545 545
546 546 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
547 547 """Deprecated, use MPIControllerLauncher"""
548 548 def __init__(self, *args, **kwargs):
549 549 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
550 550 self.warn()
551 551
552 552 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
553 553 """Deprecated, use MPIEngineSetLauncher"""
554 554 def __init__(self, *args, **kwargs):
555 555 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
556 556 self.warn()
557 557
558 558
559 559 #-----------------------------------------------------------------------------
560 560 # SSH launchers
561 561 #-----------------------------------------------------------------------------
562 562
563 563 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
564 564
565 565 class SSHLauncher(LocalProcessLauncher):
566 566 """A minimal launcher for ssh.
567 567
568 568 To be useful this will probably have to be extended to use the ``sshx``
569 569 idea for environment variables. There could be other things this needs
570 570 as well.
571 571 """
572 572
573 573 ssh_cmd = List(['ssh'], config=True,
574 574 help="command for starting ssh")
575 575 ssh_args = List(['-tt'], config=True,
576 576 help="args to pass to ssh")
577 577 scp_cmd = List(['scp'], config=True,
578 578 help="command for sending files")
579 579 program = List(['date'],
580 580 help="Program to launch via ssh")
581 581 program_args = List([],
582 582 help="args to pass to remote program")
583 583 hostname = Unicode('', config=True,
584 584 help="hostname on which to launch the program")
585 585 user = Unicode('', config=True,
586 586 help="username for ssh")
587 587 location = Unicode('', config=True,
588 588 help="user@hostname location for ssh in one setting")
589 589 to_fetch = List([], config=True,
590 590 help="List of (remote, local) files to fetch after starting")
591 591 to_send = List([], config=True,
592 592 help="List of (local, remote) files to send before starting")
593 593
594 594 def _hostname_changed(self, name, old, new):
595 595 if self.user:
596 596 self.location = u'%s@%s' % (self.user, new)
597 597 else:
598 598 self.location = new
599 599
600 600 def _user_changed(self, name, old, new):
601 601 self.location = u'%s@%s' % (new, self.hostname)
602 602
603 603 def find_args(self):
604 604 return self.ssh_cmd + self.ssh_args + [self.location] + \
605 605 list(map(pipes.quote, self.program + self.program_args))
606 606
607 607 def _send_file(self, local, remote):
608 608 """send a single file"""
609 609 remote = "%s:%s" % (self.location, remote)
610 610 for i in range(10):
611 611 if not os.path.exists(local):
612 612 self.log.debug("waiting for %s" % local)
613 613 time.sleep(1)
614 614 else:
615 615 break
616 616 self.log.info("sending %s to %s", local, remote)
617 617 check_output(self.scp_cmd + [local, remote])
618 618
619 619 def send_files(self):
620 620 """send our files (called before start)"""
621 621 if not self.to_send:
622 622 return
623 623 for local_file, remote_file in self.to_send:
624 624 self._send_file(local_file, remote_file)
625 625
626 626 def _fetch_file(self, remote, local):
627 627 """fetch a single file"""
628 628 full_remote = "%s:%s" % (self.location, remote)
629 629 self.log.info("fetching %s from %s", local, full_remote)
630 630 for i in range(10):
631 631 # wait up to 10s for remote file to exist
632 632 check = check_output(self.ssh_cmd + self.ssh_args + \
633 633 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
634 634 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
635 635 if check == u'no':
636 636 time.sleep(1)
637 637 elif check == u'yes':
638 638 break
639 639 check_output(self.scp_cmd + [full_remote, local])
640 640
641 641 def fetch_files(self):
642 642 """fetch remote files (called after start)"""
643 643 if not self.to_fetch:
644 644 return
645 645 for remote_file, local_file in self.to_fetch:
646 646 self._fetch_file(remote_file, local_file)
647 647
648 648 def start(self, hostname=None, user=None):
649 649 if hostname is not None:
650 650 self.hostname = hostname
651 651 if user is not None:
652 652 self.user = user
653 653
654 654 self.send_files()
655 655 super(SSHLauncher, self).start()
656 656 self.fetch_files()
657 657
658 658 def signal(self, sig):
659 659 if self.state == 'running':
660 660 # send escaped ssh connection-closer
661 661 self.process.stdin.write('~.')
662 662 self.process.stdin.flush()
663 663
664 664 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
665 665
666 666 remote_profile_dir = Unicode('', config=True,
667 667 help="""The remote profile_dir to use.
668 668
669 669 If not specified, use calling profile, stripping out possible leading homedir.
670 670 """)
671 671
672 672 def _profile_dir_changed(self, name, old, new):
673 673 if not self.remote_profile_dir:
674 674 # trigger remote_profile_dir_default logic again,
675 675 # in case it was already triggered before profile_dir was set
676 676 self.remote_profile_dir = self._strip_home(new)
677 677
678 678 @staticmethod
679 679 def _strip_home(path):
680 680 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
681 681 home = get_home_dir()
682 682 if not home.endswith('/'):
683 683 home = home+'/'
684 684
685 685 if path.startswith(home):
686 686 return path[len(home):]
687 687 else:
688 688 return path
689 689
690 690 def _remote_profile_dir_default(self):
691 691 return self._strip_home(self.profile_dir)
692 692
693 693 def _cluster_id_changed(self, name, old, new):
694 694 if new:
695 695 raise ValueError("cluster id not supported by SSH launchers")
696 696
697 697 @property
698 698 def cluster_args(self):
699 699 return ['--profile-dir', self.remote_profile_dir]
700 700
701 701 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
702 702
703 703 # alias back to *non-configurable* program[_args] for use in find_args()
704 704 # this way all Controller/EngineSetLaunchers have the same form, rather
705 705 # than *some* having `program_args` and others `controller_args`
706 706
707 707 def _controller_cmd_default(self):
708 708 return ['ipcontroller']
709 709
710 710 @property
711 711 def program(self):
712 712 return self.controller_cmd
713 713
714 714 @property
715 715 def program_args(self):
716 716 return self.cluster_args + self.controller_args
717 717
718 718 def _to_fetch_default(self):
719 719 return [
720 720 (os.path.join(self.remote_profile_dir, 'security', cf),
721 721 os.path.join(self.profile_dir, 'security', cf),)
722 722 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
723 723 ]
724 724
725 725 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
726 726
727 727 # alias back to *non-configurable* program[_args] for use in find_args()
728 728 # this way all Controller/EngineSetLaunchers have the same form, rather
729 729 # than *some* having `program_args` and others `controller_args`
730 730
731 731 def _engine_cmd_default(self):
732 732 return ['ipengine']
733 733
734 734 @property
735 735 def program(self):
736 736 return self.engine_cmd
737 737
738 738 @property
739 739 def program_args(self):
740 740 return self.cluster_args + self.engine_args
741 741
742 742 def _to_send_default(self):
743 743 return [
744 744 (os.path.join(self.profile_dir, 'security', cf),
745 745 os.path.join(self.remote_profile_dir, 'security', cf))
746 746 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
747 747 ]
748 748
749 749
750 750 class SSHEngineSetLauncher(LocalEngineSetLauncher):
751 751 launcher_class = SSHEngineLauncher
752 752 engines = Dict(config=True,
753 753 help="""dict of engines to launch. This is a dict by hostname of ints,
754 754 corresponding to the number of engines to start on that host.""")
755 755
756 756 def _engine_cmd_default(self):
757 757 return ['ipengine']
758 758
759 759 @property
760 760 def engine_count(self):
761 761 """determine engine count from `engines` dict"""
762 762 count = 0
763 763 for n in self.engines.itervalues():
764 764 if isinstance(n, (tuple,list)):
765 765 n,args = n
766 766 count += n
767 767 return count
768 768
769 769 def start(self, n):
770 770 """Start engines by profile or profile_dir.
771 771 `n` is ignored, and the `engines` config property is used instead.
772 772 """
773 773
774 774 dlist = []
775 775 for host, n in self.engines.iteritems():
776 776 if isinstance(n, (tuple, list)):
777 777 n, args = n
778 778 else:
779 779 args = copy.deepcopy(self.engine_args)
780 780
781 781 if '@' in host:
782 782 user,host = host.split('@',1)
783 783 else:
784 784 user=None
785 785 for i in range(n):
786 786 if i > 0:
787 787 time.sleep(self.delay)
788 788 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
789 789 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
790 790 )
791 791 if i > 0:
792 792 # only send files for the first engine on each host
793 793 el.to_send = []
794 794
795 795 # Copy the engine args over to each engine launcher.
796 796 el.engine_cmd = self.engine_cmd
797 797 el.engine_args = args
798 798 el.on_stop(self._notice_engine_stopped)
799 799 d = el.start(user=user, hostname=host)
800 800 self.launchers[ "%s/%i" % (host,i) ] = el
801 801 dlist.append(d)
802 802 self.notify_start(dlist)
803 803 return dlist
804 804
805 805
806 806 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
807 807 """Launcher for calling
808 808 `ipcluster engines` on a remote machine.
809 809
810 810 Requires that remote profile is already configured.
811 811 """
812 812
813 813 n = Integer()
814 814 ipcluster_cmd = List(['ipcluster'], config=True)
815 815
816 816 @property
817 817 def program(self):
818 818 return self.ipcluster_cmd + ['engines']
819 819
820 820 @property
821 821 def program_args(self):
822 822 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
823 823
824 824 def _to_send_default(self):
825 825 return [
826 826 (os.path.join(self.profile_dir, 'security', cf),
827 827 os.path.join(self.remote_profile_dir, 'security', cf))
828 828 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
829 829 ]
830 830
831 831 def start(self, n):
832 832 self.n = n
833 833 super(SSHProxyEngineSetLauncher, self).start()
834 834
835 835
836 836 #-----------------------------------------------------------------------------
837 837 # Windows HPC Server 2008 scheduler launchers
838 838 #-----------------------------------------------------------------------------
839 839
840 840
841 841 # This is only used on Windows.
842 842 def find_job_cmd():
843 843 if WINDOWS:
844 844 try:
845 845 return find_cmd('job')
846 846 except (FindCmdError, ImportError):
847 847 # ImportError will be raised if win32api is not installed
848 848 return 'job'
849 849 else:
850 850 return 'job'
851 851
852 852
853 853 class WindowsHPCLauncher(BaseLauncher):
854 854
855 855 job_id_regexp = CRegExp(r'\d+', config=True,
856 856 help="""A regular expression used to get the job id from the output of the
857 857 submit_command. """
858 858 )
859 859 job_file_name = Unicode(u'ipython_job.xml', config=True,
860 860 help="The filename of the instantiated job script.")
861 861 # The full path to the instantiated job script. This gets made dynamically
862 862 # by combining the work_dir with the job_file_name.
863 863 job_file = Unicode(u'')
864 864 scheduler = Unicode('', config=True,
865 865 help="The hostname of the scheduler to submit the job to.")
866 866 job_cmd = Unicode(find_job_cmd(), config=True,
867 867 help="The command for submitting jobs.")
868 868
869 869 def __init__(self, work_dir=u'.', config=None, **kwargs):
870 870 super(WindowsHPCLauncher, self).__init__(
871 871 work_dir=work_dir, config=config, **kwargs
872 872 )
873 873
874 874 @property
875 875 def job_file(self):
876 876 return os.path.join(self.work_dir, self.job_file_name)
877 877
878 878 def write_job_file(self, n):
879 879 raise NotImplementedError("Implement write_job_file in a subclass.")
880 880
881 881 def find_args(self):
882 882 return [u'job.exe']
883 883
884 884 def parse_job_id(self, output):
885 885 """Take the output of the submit command and return the job id."""
886 886 m = self.job_id_regexp.search(output)
887 887 if m is not None:
888 888 job_id = m.group()
889 889 else:
890 890 raise LauncherError("Job id couldn't be determined: %s" % output)
891 891 self.job_id = job_id
892 892 self.log.info('Job started with id: %r', job_id)
893 893 return job_id
894 894
895 895 def start(self, n):
896 896 """Start n copies of the process using the Win HPC job scheduler."""
897 897 self.write_job_file(n)
898 898 args = [
899 899 'submit',
900 900 '/jobfile:%s' % self.job_file,
901 901 '/scheduler:%s' % self.scheduler
902 902 ]
903 903 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
904 904
905 905 output = check_output([self.job_cmd]+args,
906 906 env=os.environ,
907 907 cwd=self.work_dir,
908 908 stderr=STDOUT
909 909 )
910 910 output = output.decode(DEFAULT_ENCODING, 'replace')
911 911 job_id = self.parse_job_id(output)
912 912 self.notify_start(job_id)
913 913 return job_id
914 914
915 915 def stop(self):
916 916 args = [
917 917 'cancel',
918 918 self.job_id,
919 919 '/scheduler:%s' % self.scheduler
920 920 ]
921 921 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
922 922 try:
923 923 output = check_output([self.job_cmd]+args,
924 924 env=os.environ,
925 925 cwd=self.work_dir,
926 926 stderr=STDOUT
927 927 )
928 928 output = output.decode(DEFAULT_ENCODING, 'replace')
929 929 except:
930 930 output = u'The job already appears to be stopped: %r' % self.job_id
931 931 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
932 932 return output
933 933
934 934
935 935 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
936 936
937 937 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
938 938 help="WinHPC xml job file.")
939 939 controller_args = List([], config=False,
940 940 help="extra args to pass to ipcontroller")
941 941
942 942 def write_job_file(self, n):
943 943 job = IPControllerJob(config=self.config)
944 944
945 945 t = IPControllerTask(config=self.config)
946 946 # The tasks work directory is *not* the actual work directory of
947 947 # the controller. It is used as the base path for the stdout/stderr
948 948 # files that the scheduler redirects to.
949 949 t.work_directory = self.profile_dir
950 950 # Add the profile_dir and from self.start().
951 951 t.controller_args.extend(self.cluster_args)
952 952 t.controller_args.extend(self.controller_args)
953 953 job.add_task(t)
954 954
955 955 self.log.debug("Writing job description file: %s", self.job_file)
956 956 job.write(self.job_file)
957 957
958 958 @property
959 959 def job_file(self):
960 960 return os.path.join(self.profile_dir, self.job_file_name)
961 961
962 962 def start(self):
963 963 """Start the controller by profile_dir."""
964 964 return super(WindowsHPCControllerLauncher, self).start(1)
965 965
966 966
967 967 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
968 968
969 969 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
970 970 help="jobfile for ipengines job")
971 971 engine_args = List([], config=False,
972 972 help="extra args to pas to ipengine")
973 973
974 974 def write_job_file(self, n):
975 975 job = IPEngineSetJob(config=self.config)
976 976
977 977 for i in range(n):
978 978 t = IPEngineTask(config=self.config)
979 979 # The tasks work directory is *not* the actual work directory of
980 980 # the engine. It is used as the base path for the stdout/stderr
981 981 # files that the scheduler redirects to.
982 982 t.work_directory = self.profile_dir
983 983 # Add the profile_dir and from self.start().
984 984 t.engine_args.extend(self.cluster_args)
985 985 t.engine_args.extend(self.engine_args)
986 986 job.add_task(t)
987 987
988 988 self.log.debug("Writing job description file: %s", self.job_file)
989 989 job.write(self.job_file)
990 990
991 991 @property
992 992 def job_file(self):
993 993 return os.path.join(self.profile_dir, self.job_file_name)
994 994
995 995 def start(self, n):
996 996 """Start the controller by profile_dir."""
997 997 return super(WindowsHPCEngineSetLauncher, self).start(n)
998 998
999 999
1000 1000 #-----------------------------------------------------------------------------
1001 1001 # Batch (PBS) system launchers
1002 1002 #-----------------------------------------------------------------------------
1003 1003
1004 1004 class BatchClusterAppMixin(ClusterAppMixin):
1005 1005 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1006 1006 def _profile_dir_changed(self, name, old, new):
1007 1007 self.context[name] = new
1008 1008 _cluster_id_changed = _profile_dir_changed
1009 1009
1010 1010 def _profile_dir_default(self):
1011 1011 self.context['profile_dir'] = ''
1012 1012 return ''
1013 1013 def _cluster_id_default(self):
1014 1014 self.context['cluster_id'] = ''
1015 1015 return ''
1016 1016
1017 1017
1018 1018 class BatchSystemLauncher(BaseLauncher):
1019 1019 """Launch an external process using a batch system.
1020 1020
1021 1021 This class is designed to work with UNIX batch systems like PBS, LSF,
1022 1022 GridEngine, etc. The overall model is that there are different commands
1023 1023 like qsub, qdel, etc. that handle the starting and stopping of the process.
1024 1024
1025 1025 This class also has the notion of a batch script. The ``batch_template``
1026 1026 attribute can be set to a string that is a template for the batch script.
1027 1027 This template is instantiated using string formatting. Thus the template can
1028 1028 use {n} fot the number of instances. Subclasses can add additional variables
1029 1029 to the template dict.
1030 1030 """
1031 1031
1032 1032 # Subclasses must fill these in. See PBSEngineSet
1033 1033 submit_command = List([''], config=True,
1034 1034 help="The name of the command line program used to submit jobs.")
1035 1035 delete_command = List([''], config=True,
1036 1036 help="The name of the command line program used to delete jobs.")
1037 1037 job_id_regexp = CRegExp('', config=True,
1038 1038 help="""A regular expression used to get the job id from the output of the
1039 1039 submit_command.""")
1040 1040 job_id_regexp_group = Integer(0, config=True,
1041 1041 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1042 1042 batch_template = Unicode('', config=True,
1043 1043 help="The string that is the batch script template itself.")
1044 1044 batch_template_file = Unicode(u'', config=True,
1045 1045 help="The file that contains the batch template.")
1046 1046 batch_file_name = Unicode(u'batch_script', config=True,
1047 1047 help="The filename of the instantiated batch script.")
1048 1048 queue = Unicode(u'', config=True,
1049 1049 help="The PBS Queue.")
1050 1050
1051 1051 def _queue_changed(self, name, old, new):
1052 1052 self.context[name] = new
1053 1053
1054 1054 n = Integer(1)
1055 1055 _n_changed = _queue_changed
1056 1056
1057 1057 # not configurable, override in subclasses
1058 1058 # PBS Job Array regex
1059 1059 job_array_regexp = CRegExp('')
1060 1060 job_array_template = Unicode('')
1061 1061 # PBS Queue regex
1062 1062 queue_regexp = CRegExp('')
1063 1063 queue_template = Unicode('')
1064 1064 # The default batch template, override in subclasses
1065 1065 default_template = Unicode('')
1066 1066 # The full path to the instantiated batch script.
1067 1067 batch_file = Unicode(u'')
1068 1068 # the format dict used with batch_template:
1069 1069 context = Dict()
1070 1070
1071 1071 def _context_default(self):
1072 1072 """load the default context with the default values for the basic keys
1073 1073
1074 1074 because the _trait_changed methods only load the context if they
1075 1075 are set to something other than the default value.
1076 1076 """
1077 1077 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1078 1078
1079 1079 # the Formatter instance for rendering the templates:
1080 1080 formatter = Instance(EvalFormatter, (), {})
1081 1081
1082 1082 def find_args(self):
1083 1083 return self.submit_command + [self.batch_file]
1084 1084
1085 1085 def __init__(self, work_dir=u'.', config=None, **kwargs):
1086 1086 super(BatchSystemLauncher, self).__init__(
1087 1087 work_dir=work_dir, config=config, **kwargs
1088 1088 )
1089 1089 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1090 1090
1091 1091 def parse_job_id(self, output):
1092 1092 """Take the output of the submit command and return the job id."""
1093 1093 m = self.job_id_regexp.search(output)
1094 1094 if m is not None:
1095 1095 job_id = m.group(self.job_id_regexp_group)
1096 1096 else:
1097 1097 raise LauncherError("Job id couldn't be determined: %s" % output)
1098 1098 self.job_id = job_id
1099 1099 self.log.info('Job submitted with job id: %r', job_id)
1100 1100 return job_id
1101 1101
1102 1102 def write_batch_script(self, n):
1103 1103 """Instantiate and write the batch script to the work_dir."""
1104 1104 self.n = n
1105 1105 # first priority is batch_template if set
1106 1106 if self.batch_template_file and not self.batch_template:
1107 1107 # second priority is batch_template_file
1108 1108 with open(self.batch_template_file) as f:
1109 1109 self.batch_template = f.read()
1110 1110 if not self.batch_template:
1111 1111 # third (last) priority is default_template
1112 1112 self.batch_template = self.default_template
1113 1113 # add jobarray or queue lines to user-specified template
1114 1114 # note that this is *only* when user did not specify a template.
1115 1115 self._insert_queue_in_script()
1116 1116 self._insert_job_array_in_script()
1117 1117 script_as_string = self.formatter.format(self.batch_template, **self.context)
1118 1118 self.log.debug('Writing batch script: %s', self.batch_file)
1119 1119 with open(self.batch_file, 'w') as f:
1120 1120 f.write(script_as_string)
1121 1121 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1122 1122
1123 1123 def _insert_queue_in_script(self):
1124 1124 """Inserts a queue if required into the batch script.
1125 1125 """
1126 1126 print self.queue_regexp.search(self.batch_template)
1127 1127 if self.queue and not self.queue_regexp.search(self.batch_template):
1128 1128 self.log.debug("adding PBS queue settings to batch script")
1129 1129 firstline, rest = self.batch_template.split('\n',1)
1130 1130 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1131 1131
1132 1132 def _insert_job_array_in_script(self):
1133 1133 """Inserts a job array if required into the batch script.
1134 1134 """
1135 1135 print self.job_array_regexp.search(self.batch_template)
1136 1136 if not self.job_array_regexp.search(self.batch_template):
1137 1137 self.log.debug("adding job array settings to batch script")
1138 1138 firstline, rest = self.batch_template.split('\n',1)
1139 1139 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1140 1140
1141 1141 def start(self, n):
1142 1142 """Start n copies of the process using a batch system."""
1143 1143 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1144 1144 # Here we save profile_dir in the context so they
1145 1145 # can be used in the batch script template as {profile_dir}
1146 1146 self.write_batch_script(n)
1147 1147 output = check_output(self.args, env=os.environ)
1148 1148 output = output.decode(DEFAULT_ENCODING, 'replace')
1149 1149
1150 1150 job_id = self.parse_job_id(output)
1151 1151 self.notify_start(job_id)
1152 1152 return job_id
1153 1153
1154 1154 def stop(self):
1155 1155 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1156 1156 output = output.decode(DEFAULT_ENCODING, 'replace')
1157 1157 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1158 1158 return output
1159 1159
1160 1160
1161 1161 class PBSLauncher(BatchSystemLauncher):
1162 1162 """A BatchSystemLauncher subclass for PBS."""
1163 1163
1164 1164 submit_command = List(['qsub'], config=True,
1165 1165 help="The PBS submit command ['qsub']")
1166 1166 delete_command = List(['qdel'], config=True,
1167 1167 help="The PBS delete command ['qsub']")
1168 1168 job_id_regexp = CRegExp(r'\d+', config=True,
1169 1169 help="Regular expresion for identifying the job ID [r'\d+']")
1170 1170
1171 1171 batch_file = Unicode(u'')
1172 1172 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1173 1173 job_array_template = Unicode('#PBS -t 1-{n}')
1174 1174 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1175 1175 queue_template = Unicode('#PBS -q {queue}')
1176 1176
1177 1177
1178 1178 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1179 1179 """Launch a controller using PBS."""
1180 1180
1181 1181 batch_file_name = Unicode(u'pbs_controller', config=True,
1182 1182 help="batch file name for the controller job.")
1183 1183 default_template= Unicode("""#!/bin/sh
1184 1184 #PBS -V
1185 1185 #PBS -N ipcontroller
1186 1186 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1187 1187 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1188 1188
1189 1189
1190 1190 def start(self):
1191 1191 """Start the controller by profile or profile_dir."""
1192 1192 return super(PBSControllerLauncher, self).start(1)
1193 1193
1194 1194
1195 1195 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1196 1196 """Launch Engines using PBS"""
1197 1197 batch_file_name = Unicode(u'pbs_engines', config=True,
1198 1198 help="batch file name for the engine(s) job.")
1199 1199 default_template= Unicode(u"""#!/bin/sh
1200 1200 #PBS -V
1201 1201 #PBS -N ipengine
1202 1202 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1203 1203 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1204 1204
1205 1205 def start(self, n):
1206 1206 """Start n engines by profile or profile_dir."""
1207 1207 return super(PBSEngineSetLauncher, self).start(n)
1208 1208
1209 1209
1210 1210 #SGE is very similar to PBS
1211 1211
1212 1212 class SGELauncher(PBSLauncher):
1213 1213 """Sun GridEngine is a PBS clone with slightly different syntax"""
1214 1214 job_array_regexp = CRegExp('#\$\W+\-t')
1215 1215 job_array_template = Unicode('#$ -t 1-{n}')
1216 1216 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1217 1217 queue_template = Unicode('#$ -q {queue}')
1218 1218
1219 1219
1220 1220 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1221 1221 """Launch a controller using SGE."""
1222 1222
1223 1223 batch_file_name = Unicode(u'sge_controller', config=True,
1224 1224 help="batch file name for the ipontroller job.")
1225 1225 default_template= Unicode(u"""#$ -V
1226 1226 #$ -S /bin/sh
1227 1227 #$ -N ipcontroller
1228 1228 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1229 1229 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1230 1230
1231 1231 def start(self):
1232 1232 """Start the controller by profile or profile_dir."""
1233 1233 return super(SGEControllerLauncher, self).start(1)
1234 1234
1235 1235
1236 1236 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1237 1237 """Launch Engines with SGE"""
1238 1238 batch_file_name = Unicode(u'sge_engines', config=True,
1239 1239 help="batch file name for the engine(s) job.")
1240 1240 default_template = Unicode("""#$ -V
1241 1241 #$ -S /bin/sh
1242 1242 #$ -N ipengine
1243 1243 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1244 1244 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1245 1245
1246 1246 def start(self, n):
1247 1247 """Start n engines by profile or profile_dir."""
1248 1248 return super(SGEEngineSetLauncher, self).start(n)
1249 1249
1250 1250
1251 1251 # LSF launchers
1252 1252
1253 1253 class LSFLauncher(BatchSystemLauncher):
1254 1254 """A BatchSystemLauncher subclass for LSF."""
1255 1255
1256 1256 submit_command = List(['bsub'], config=True,
1257 1257 help="The PBS submit command ['bsub']")
1258 1258 delete_command = List(['bkill'], config=True,
1259 1259 help="The PBS delete command ['bkill']")
1260 1260 job_id_regexp = CRegExp(r'\d+', config=True,
1261 1261 help="Regular expresion for identifying the job ID [r'\d+']")
1262 1262
1263 1263 batch_file = Unicode(u'')
1264 1264 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1265 1265 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1266 1266 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1267 1267 queue_template = Unicode('#BSUB -q {queue}')
1268 1268
1269 1269 def start(self, n):
1270 1270 """Start n copies of the process using LSF batch system.
1271 1271 This cant inherit from the base class because bsub expects
1272 1272 to be piped a shell script in order to honor the #BSUB directives :
1273 1273 bsub < script
1274 1274 """
1275 1275 # Here we save profile_dir in the context so they
1276 1276 # can be used in the batch script template as {profile_dir}
1277 1277 self.write_batch_script(n)
1278 1278 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1279 1279 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1280 1280 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1281 1281 output,err = p.communicate()
1282 1282 output = output.decode(DEFAULT_ENCODING, 'replace')
1283 1283 job_id = self.parse_job_id(output)
1284 1284 self.notify_start(job_id)
1285 1285 return job_id
1286 1286
1287 1287
1288 1288 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1289 1289 """Launch a controller using LSF."""
1290 1290
1291 1291 batch_file_name = Unicode(u'lsf_controller', config=True,
1292 1292 help="batch file name for the controller job.")
1293 1293 default_template= Unicode("""#!/bin/sh
1294 1294 #BSUB -J ipcontroller
1295 1295 #BSUB -oo ipcontroller.o.%%J
1296 1296 #BSUB -eo ipcontroller.e.%%J
1297 1297 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1298 1298 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1299 1299
1300 1300 def start(self):
1301 1301 """Start the controller by profile or profile_dir."""
1302 1302 return super(LSFControllerLauncher, self).start(1)
1303 1303
1304 1304
1305 1305 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1306 1306 """Launch Engines using LSF"""
1307 1307 batch_file_name = Unicode(u'lsf_engines', config=True,
1308 1308 help="batch file name for the engine(s) job.")
1309 1309 default_template= Unicode(u"""#!/bin/sh
1310 1310 #BSUB -oo ipengine.o.%%J
1311 1311 #BSUB -eo ipengine.e.%%J
1312 1312 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1313 1313 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1314 1314
1315 1315 def start(self, n):
1316 1316 """Start n engines by profile or profile_dir."""
1317 1317 return super(LSFEngineSetLauncher, self).start(n)
1318 1318
1319 1319
1320 1320 # Condor Requires that we launch the ipengine/ipcontroller scripts rather
1321 1321 # that the python instance but otherwise is very similar to PBS
1322 1322
1323 1323 class CondorLauncher(BatchSystemLauncher):
1324 1324 """A BatchSystemLauncher subclass for Condor."""
1325 1325
1326 1326 submit_command = List(['condor_submit'], config=True,
1327 1327 help="The Condor submit command ['condor_submit']")
1328 1328 delete_command = List(['condor_rm'], config=True,
1329 1329 help="The Condor delete command ['condor_rm']")
1330 1330 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1331 1331 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1332 1332 job_id_regexp_group = Integer(1, config=True,
1333 1333 help="""The group we wish to match in job_id_regexp [1]""")
1334 1334
1335 1335 job_array_regexp = CRegExp('queue\W+\$')
1336 1336 job_array_template = Unicode('queue {n}')
1337 # template for the submission of multiple jobs
1338 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1339 # regex to find a queue if the user has specified a template
1340 queue_template = Unicode('#PBS -q {queue}')
1341 # the queue we wish to submit to. Need to know the Condor eqiv (eg ibug cluster
1342 # or general?)
1337
1343 1338
1344 1339 def _insert_job_array_in_script(self):
1345 1340 """Inserts a job array if required into the batch script.
1346 1341 """
1347 print self.job_array_regexp.search(self.batch_template)
1348 #Condor requires that the job array goes at the bottom of the
1349 #script
1350 1342 if not self.job_array_regexp.search(self.batch_template):
1351 1343 self.log.debug("adding job array settings to batch script")
1344 #Condor requires that the job array goes at the bottom of the script
1352 1345 self.batch_template = '\n'.join([self.batch_template,
1353 1346 self.job_array_template])
1354 1347
1348 def _insert_queue_in_script(self):
1349 """AFAIK, Condor doesn't have a concept of multiple queues that can be
1350 specified in the script..
1351 """
1352 pass
1353
1355 1354
1356 1355 class CondorControllerLauncher(CondorLauncher, BatchClusterAppMixin):
1357 1356 """Launch a controller using Condor."""
1358 1357
1359 1358 batch_file_name = Unicode(u'condor_controller', config=True,
1360 1359 help="batch file name for the controller job.")
1361 1360 default_template = Unicode(r"""
1362 1361 universe = vanilla
1363 1362 executable = %s
1364 1363 # by default we expect a shared file system
1365 1364 transfer_executable = False
1366 1365 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1367 1366 """ % condor_ipcontroller_cmd_argv)
1368 1367
1369 1368 def start(self):
1370 1369 """Start the controller by profile or profile_dir."""
1371 1370 return super(CondorControllerLauncher, self).start(1)
1372 1371
1373 1372
1374 1373 class CondorEngineSetLauncher(CondorLauncher, BatchClusterAppMixin):
1375 1374 """Launch Engines using Condor"""
1376 1375 batch_file_name = Unicode(u'condor_engines', config=True,
1377 1376 help="batch file name for the engine(s) job.")
1378 1377 default_template = Unicode("""
1379 1378 universe = vanilla
1380 1379 executable = %s
1381 1380 # by default we expect a shared file system
1382 1381 transfer_executable = False
1383 1382 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1384 1383 """ % condor_ipengine_cmd_argv)
1385 1384
1386 1385 def start(self, n):
1387 1386 """Start n engines by profile or profile_dir."""
1388 1387 return super(CondorEngineSetLauncher, self).start(n)
1389 1388
1390 1389
1391 1390 #-----------------------------------------------------------------------------
1392 1391 # A launcher for ipcluster itself!
1393 1392 #-----------------------------------------------------------------------------
1394 1393
1395 1394
1396 1395 class IPClusterLauncher(LocalProcessLauncher):
1397 1396 """Launch the ipcluster program in an external process."""
1398 1397
1399 1398 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1400 1399 help="Popen command for ipcluster")
1401 1400 ipcluster_args = List(
1402 1401 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1403 1402 help="Command line arguments to pass to ipcluster.")
1404 1403 ipcluster_subcommand = Unicode('start')
1405 1404 profile = Unicode('default')
1406 1405 n = Integer(2)
1407 1406
1408 1407 def find_args(self):
1409 1408 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1410 1409 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1411 1410 self.ipcluster_args
1412 1411
1413 1412 def start(self):
1414 1413 return super(IPClusterLauncher, self).start()
1415 1414
1416 1415 #-----------------------------------------------------------------------------
1417 1416 # Collections of launchers
1418 1417 #-----------------------------------------------------------------------------
1419 1418
1420 1419 local_launchers = [
1421 1420 LocalControllerLauncher,
1422 1421 LocalEngineLauncher,
1423 1422 LocalEngineSetLauncher,
1424 1423 ]
1425 1424 mpi_launchers = [
1426 1425 MPILauncher,
1427 1426 MPIControllerLauncher,
1428 1427 MPIEngineSetLauncher,
1429 1428 ]
1430 1429 ssh_launchers = [
1431 1430 SSHLauncher,
1432 1431 SSHControllerLauncher,
1433 1432 SSHEngineLauncher,
1434 1433 SSHEngineSetLauncher,
1435 1434 SSHProxyEngineSetLauncher,
1436 1435 ]
1437 1436 winhpc_launchers = [
1438 1437 WindowsHPCLauncher,
1439 1438 WindowsHPCControllerLauncher,
1440 1439 WindowsHPCEngineSetLauncher,
1441 1440 ]
1442 1441 pbs_launchers = [
1443 1442 PBSLauncher,
1444 1443 PBSControllerLauncher,
1445 1444 PBSEngineSetLauncher,
1446 1445 ]
1447 1446 sge_launchers = [
1448 1447 SGELauncher,
1449 1448 SGEControllerLauncher,
1450 1449 SGEEngineSetLauncher,
1451 1450 ]
1452 1451 lsf_launchers = [
1453 1452 LSFLauncher,
1454 1453 LSFControllerLauncher,
1455 1454 LSFEngineSetLauncher,
1456 1455 ]
1457 1456 condor_launchers = [
1458 1457 CondorLauncher,
1459 1458 CondorControllerLauncher,
1460 1459 CondorEngineSetLauncher,
1461 1460 ]
1462 1461 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1463 1462 + pbs_launchers + sge_launchers + lsf_launchers + condor_launchers
General Comments 0
You need to be logged in to leave comments. Login now