##// END OF EJS Templates
removed noop start methods on EngineSetLaunchers
James Booth -
Show More
@@ -1,1460 +1,1443 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import pipes
26 26 import stat
27 27 import sys
28 28 import time
29 29
30 30 # signal imports, handling various platforms, versions
31 31
32 32 from signal import SIGINT, SIGTERM
33 33 try:
34 34 from signal import SIGKILL
35 35 except ImportError:
36 36 # Windows
37 37 SIGKILL=SIGTERM
38 38
39 39 try:
40 40 # Windows >= 2.7, 3.2
41 41 from signal import CTRL_C_EVENT as SIGINT
42 42 except ImportError:
43 43 pass
44 44
45 45 from subprocess import Popen, PIPE, STDOUT
46 46 try:
47 47 from subprocess import check_output
48 48 except ImportError:
49 49 # pre-2.7, define check_output with Popen
50 50 def check_output(*args, **kwargs):
51 51 kwargs.update(dict(stdout=PIPE))
52 52 p = Popen(*args, **kwargs)
53 53 out,err = p.communicate()
54 54 return out
55 55
56 56 from zmq.eventloop import ioloop
57 57
58 58 from IPython.config.application import Application
59 59 from IPython.config.configurable import LoggingConfigurable
60 60 from IPython.utils.text import EvalFormatter
61 61 from IPython.utils.traitlets import (
62 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 63 )
64 64 from IPython.utils.encoding import DEFAULT_ENCODING
65 65 from IPython.utils.path import get_home_dir
66 66 from IPython.utils.process import find_cmd, FindCmdError
67 67
68 68 from .win32support import forward_read_events
69 69
70 70 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71 71
72 72 WINDOWS = os.name == 'nt'
73 73
74 74 #-----------------------------------------------------------------------------
75 75 # Paths to the kernel apps
76 76 #-----------------------------------------------------------------------------
77 77
78 78 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
79 79
80 80 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
81 81
82 82 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
83 83
84 84 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
85 85
86 86 # HTCondor frustratingly destroys sys.executable when launching remote processes
87 87 # thus Python will default back to the system module search paths which is
88 88 # ridiculously fragile (not to mention destructive for virutalenvs).
89 89 # however, if we use the ip{cluster, engine, controller} scripts as our
90 90 # executable we circumvent this - the mechanism of shebanged scripts means that
91 91 # the python binary will be launched with argv[0] set correctly.
92 92 # This does mean that for HTCondor we require that:
93 93 # a. The python interpreter you are using is in a folder next to the ipengine,
94 94 # ipcluster and ipcontroller scripts
95 95 # b. I have no idea what the consequences are for Windows.
96 96 bin_dir = os.path.dirname(sys.executable)
97 97
98 98 condor_ipcluster_cmd_argv = os.path.join(bin_dir, 'ipcluster')
99 99
100 100 condor_ipengine_cmd_argv = os.path.join(bin_dir, 'ipengine')
101 101
102 102 condor_ipcontroller_cmd_argv = os.path.join(bin_dir, 'ipcontroller')
103 103
104 104 #-----------------------------------------------------------------------------
105 105 # Base launchers and errors
106 106 #-----------------------------------------------------------------------------
107 107
108 108 class LauncherError(Exception):
109 109 pass
110 110
111 111
112 112 class ProcessStateError(LauncherError):
113 113 pass
114 114
115 115
116 116 class UnknownStatus(LauncherError):
117 117 pass
118 118
119 119
120 120 class BaseLauncher(LoggingConfigurable):
121 121 """An asbtraction for starting, stopping and signaling a process."""
122 122
123 123 # In all of the launchers, the work_dir is where child processes will be
124 124 # run. This will usually be the profile_dir, but may not be. any work_dir
125 125 # passed into the __init__ method will override the config value.
126 126 # This should not be used to set the work_dir for the actual engine
127 127 # and controller. Instead, use their own config files or the
128 128 # controller_args, engine_args attributes of the launchers to add
129 129 # the work_dir option.
130 130 work_dir = Unicode(u'.')
131 131 loop = Instance('zmq.eventloop.ioloop.IOLoop')
132 132
133 133 start_data = Any()
134 134 stop_data = Any()
135 135
136 136 def _loop_default(self):
137 137 return ioloop.IOLoop.instance()
138 138
139 139 def __init__(self, work_dir=u'.', config=None, **kwargs):
140 140 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
141 141 self.state = 'before' # can be before, running, after
142 142 self.stop_callbacks = []
143 143 self.start_data = None
144 144 self.stop_data = None
145 145
146 146 @property
147 147 def args(self):
148 148 """A list of cmd and args that will be used to start the process.
149 149
150 150 This is what is passed to :func:`spawnProcess` and the first element
151 151 will be the process name.
152 152 """
153 153 return self.find_args()
154 154
155 155 def find_args(self):
156 156 """The ``.args`` property calls this to find the args list.
157 157
158 158 Subcommand should implement this to construct the cmd and args.
159 159 """
160 160 raise NotImplementedError('find_args must be implemented in a subclass')
161 161
162 162 @property
163 163 def arg_str(self):
164 164 """The string form of the program arguments."""
165 165 return ' '.join(self.args)
166 166
167 167 @property
168 168 def running(self):
169 169 """Am I running."""
170 170 if self.state == 'running':
171 171 return True
172 172 else:
173 173 return False
174 174
175 175 def start(self):
176 176 """Start the process."""
177 177 raise NotImplementedError('start must be implemented in a subclass')
178 178
179 179 def stop(self):
180 180 """Stop the process and notify observers of stopping.
181 181
182 182 This method will return None immediately.
183 183 To observe the actual process stopping, see :meth:`on_stop`.
184 184 """
185 185 raise NotImplementedError('stop must be implemented in a subclass')
186 186
187 187 def on_stop(self, f):
188 188 """Register a callback to be called with this Launcher's stop_data
189 189 when the process actually finishes.
190 190 """
191 191 if self.state=='after':
192 192 return f(self.stop_data)
193 193 else:
194 194 self.stop_callbacks.append(f)
195 195
196 196 def notify_start(self, data):
197 197 """Call this to trigger startup actions.
198 198
199 199 This logs the process startup and sets the state to 'running'. It is
200 200 a pass-through so it can be used as a callback.
201 201 """
202 202
203 203 self.log.debug('Process %r started: %r', self.args[0], data)
204 204 self.start_data = data
205 205 self.state = 'running'
206 206 return data
207 207
208 208 def notify_stop(self, data):
209 209 """Call this to trigger process stop actions.
210 210
211 211 This logs the process stopping and sets the state to 'after'. Call
212 212 this to trigger callbacks registered via :meth:`on_stop`."""
213 213
214 214 self.log.debug('Process %r stopped: %r', self.args[0], data)
215 215 self.stop_data = data
216 216 self.state = 'after'
217 217 for i in range(len(self.stop_callbacks)):
218 218 d = self.stop_callbacks.pop()
219 219 d(data)
220 220 return data
221 221
222 222 def signal(self, sig):
223 223 """Signal the process.
224 224
225 225 Parameters
226 226 ----------
227 227 sig : str or int
228 228 'KILL', 'INT', etc., or any signal number
229 229 """
230 230 raise NotImplementedError('signal must be implemented in a subclass')
231 231
232 232 class ClusterAppMixin(HasTraits):
233 233 """MixIn for cluster args as traits"""
234 234 profile_dir=Unicode('')
235 235 cluster_id=Unicode('')
236 236
237 237 @property
238 238 def cluster_args(self):
239 239 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
240 240
241 241 class ControllerMixin(ClusterAppMixin):
242 242 controller_cmd = List(ipcontroller_cmd_argv, config=True,
243 243 help="""Popen command to launch ipcontroller.""")
244 244 # Command line arguments to ipcontroller.
245 245 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
246 246 help="""command-line args to pass to ipcontroller""")
247 247
248 248 class EngineMixin(ClusterAppMixin):
249 249 engine_cmd = List(ipengine_cmd_argv, config=True,
250 250 help="""command to launch the Engine.""")
251 251 # Command line arguments for ipengine.
252 252 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
253 253 help="command-line arguments to pass to ipengine"
254 254 )
255 255
256 256
257 257 #-----------------------------------------------------------------------------
258 258 # Local process launchers
259 259 #-----------------------------------------------------------------------------
260 260
261 261
262 262 class LocalProcessLauncher(BaseLauncher):
263 263 """Start and stop an external process in an asynchronous manner.
264 264
265 265 This will launch the external process with a working directory of
266 266 ``self.work_dir``.
267 267 """
268 268
269 269 # This is used to to construct self.args, which is passed to
270 270 # spawnProcess.
271 271 cmd_and_args = List([])
272 272 poll_frequency = Integer(100) # in ms
273 273
274 274 def __init__(self, work_dir=u'.', config=None, **kwargs):
275 275 super(LocalProcessLauncher, self).__init__(
276 276 work_dir=work_dir, config=config, **kwargs
277 277 )
278 278 self.process = None
279 279 self.poller = None
280 280
281 281 def find_args(self):
282 282 return self.cmd_and_args
283 283
284 284 def start(self):
285 285 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
286 286 if self.state == 'before':
287 287 self.process = Popen(self.args,
288 288 stdout=PIPE,stderr=PIPE,stdin=PIPE,
289 289 env=os.environ,
290 290 cwd=self.work_dir
291 291 )
292 292 if WINDOWS:
293 293 self.stdout = forward_read_events(self.process.stdout)
294 294 self.stderr = forward_read_events(self.process.stderr)
295 295 else:
296 296 self.stdout = self.process.stdout.fileno()
297 297 self.stderr = self.process.stderr.fileno()
298 298 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
299 299 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
300 300 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
301 301 self.poller.start()
302 302 self.notify_start(self.process.pid)
303 303 else:
304 304 s = 'The process was already started and has state: %r' % self.state
305 305 raise ProcessStateError(s)
306 306
307 307 def stop(self):
308 308 return self.interrupt_then_kill()
309 309
310 310 def signal(self, sig):
311 311 if self.state == 'running':
312 312 if WINDOWS and sig != SIGINT:
313 313 # use Windows tree-kill for better child cleanup
314 314 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
315 315 else:
316 316 self.process.send_signal(sig)
317 317
318 318 def interrupt_then_kill(self, delay=2.0):
319 319 """Send INT, wait a delay and then send KILL."""
320 320 try:
321 321 self.signal(SIGINT)
322 322 except Exception:
323 323 self.log.debug("interrupt failed")
324 324 pass
325 325 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
326 326 self.killer.start()
327 327
328 328 # callbacks, etc:
329 329
330 330 def handle_stdout(self, fd, events):
331 331 if WINDOWS:
332 332 line = self.stdout.recv()
333 333 else:
334 334 line = self.process.stdout.readline()
335 335 # a stopped process will be readable but return empty strings
336 336 if line:
337 337 self.log.debug(line[:-1])
338 338 else:
339 339 self.poll()
340 340
341 341 def handle_stderr(self, fd, events):
342 342 if WINDOWS:
343 343 line = self.stderr.recv()
344 344 else:
345 345 line = self.process.stderr.readline()
346 346 # a stopped process will be readable but return empty strings
347 347 if line:
348 348 self.log.debug(line[:-1])
349 349 else:
350 350 self.poll()
351 351
352 352 def poll(self):
353 353 status = self.process.poll()
354 354 if status is not None:
355 355 self.poller.stop()
356 356 self.loop.remove_handler(self.stdout)
357 357 self.loop.remove_handler(self.stderr)
358 358 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
359 359 return status
360 360
361 361 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
362 362 """Launch a controller as a regular external process."""
363 363
364 364 def find_args(self):
365 365 return self.controller_cmd + self.cluster_args + self.controller_args
366 366
367 367 def start(self):
368 368 """Start the controller by profile_dir."""
369 369 return super(LocalControllerLauncher, self).start()
370 370
371 371
372 372 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
373 373 """Launch a single engine as a regular externall process."""
374 374
375 375 def find_args(self):
376 376 return self.engine_cmd + self.cluster_args + self.engine_args
377 377
378 378
379 379 class LocalEngineSetLauncher(LocalEngineLauncher):
380 380 """Launch a set of engines as regular external processes."""
381 381
382 382 delay = CFloat(0.1, config=True,
383 383 help="""delay (in seconds) between starting each engine after the first.
384 384 This can help force the engines to get their ids in order, or limit
385 385 process flood when starting many engines."""
386 386 )
387 387
388 388 # launcher class
389 389 launcher_class = LocalEngineLauncher
390 390
391 391 launchers = Dict()
392 392 stop_data = Dict()
393 393
394 394 def __init__(self, work_dir=u'.', config=None, **kwargs):
395 395 super(LocalEngineSetLauncher, self).__init__(
396 396 work_dir=work_dir, config=config, **kwargs
397 397 )
398 398 self.stop_data = {}
399 399
400 400 def start(self, n):
401 401 """Start n engines by profile or profile_dir."""
402 402 dlist = []
403 403 for i in range(n):
404 404 if i > 0:
405 405 time.sleep(self.delay)
406 406 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
407 407 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
408 408 )
409 409
410 410 # Copy the engine args over to each engine launcher.
411 411 el.engine_cmd = copy.deepcopy(self.engine_cmd)
412 412 el.engine_args = copy.deepcopy(self.engine_args)
413 413 el.on_stop(self._notice_engine_stopped)
414 414 d = el.start()
415 415 self.launchers[i] = el
416 416 dlist.append(d)
417 417 self.notify_start(dlist)
418 418 return dlist
419 419
420 420 def find_args(self):
421 421 return ['engine set']
422 422
423 423 def signal(self, sig):
424 424 dlist = []
425 425 for el in self.launchers.itervalues():
426 426 d = el.signal(sig)
427 427 dlist.append(d)
428 428 return dlist
429 429
430 430 def interrupt_then_kill(self, delay=1.0):
431 431 dlist = []
432 432 for el in self.launchers.itervalues():
433 433 d = el.interrupt_then_kill(delay)
434 434 dlist.append(d)
435 435 return dlist
436 436
437 437 def stop(self):
438 438 return self.interrupt_then_kill()
439 439
440 440 def _notice_engine_stopped(self, data):
441 441 pid = data['pid']
442 442 for idx,el in self.launchers.iteritems():
443 443 if el.process.pid == pid:
444 444 break
445 445 self.launchers.pop(idx)
446 446 self.stop_data[idx] = data
447 447 if not self.launchers:
448 448 self.notify_stop(self.stop_data)
449 449
450 450
451 451 #-----------------------------------------------------------------------------
452 452 # MPI launchers
453 453 #-----------------------------------------------------------------------------
454 454
455 455
456 456 class MPILauncher(LocalProcessLauncher):
457 457 """Launch an external process using mpiexec."""
458 458
459 459 mpi_cmd = List(['mpiexec'], config=True,
460 460 help="The mpiexec command to use in starting the process."
461 461 )
462 462 mpi_args = List([], config=True,
463 463 help="The command line arguments to pass to mpiexec."
464 464 )
465 465 program = List(['date'],
466 466 help="The program to start via mpiexec.")
467 467 program_args = List([],
468 468 help="The command line argument to the program."
469 469 )
470 470 n = Integer(1)
471 471
472 472 def __init__(self, *args, **kwargs):
473 473 # deprecation for old MPIExec names:
474 474 config = kwargs.get('config', {})
475 475 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
476 476 deprecated = config.get(oldname)
477 477 if deprecated:
478 478 newname = oldname.replace('MPIExec', 'MPI')
479 479 config[newname].update(deprecated)
480 480 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
481 481
482 482 super(MPILauncher, self).__init__(*args, **kwargs)
483 483
484 484 def find_args(self):
485 485 """Build self.args using all the fields."""
486 486 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
487 487 self.program + self.program_args
488 488
489 489 def start(self, n):
490 490 """Start n instances of the program using mpiexec."""
491 491 self.n = n
492 492 return super(MPILauncher, self).start()
493 493
494 494
495 495 class MPIControllerLauncher(MPILauncher, ControllerMixin):
496 496 """Launch a controller using mpiexec."""
497 497
498 498 # alias back to *non-configurable* program[_args] for use in find_args()
499 499 # this way all Controller/EngineSetLaunchers have the same form, rather
500 500 # than *some* having `program_args` and others `controller_args`
501 501 @property
502 502 def program(self):
503 503 return self.controller_cmd
504 504
505 505 @property
506 506 def program_args(self):
507 507 return self.cluster_args + self.controller_args
508 508
509 509 def start(self):
510 510 """Start the controller by profile_dir."""
511 511 return super(MPIControllerLauncher, self).start(1)
512 512
513 513
514 514 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
515 515 """Launch engines using mpiexec"""
516 516
517 517 # alias back to *non-configurable* program[_args] for use in find_args()
518 518 # this way all Controller/EngineSetLaunchers have the same form, rather
519 519 # than *some* having `program_args` and others `controller_args`
520 520 @property
521 521 def program(self):
522 522 return self.engine_cmd
523 523
524 524 @property
525 525 def program_args(self):
526 526 return self.cluster_args + self.engine_args
527 527
528 528 def start(self, n):
529 529 """Start n engines by profile or profile_dir."""
530 530 self.n = n
531 531 return super(MPIEngineSetLauncher, self).start(n)
532 532
533 533 # deprecated MPIExec names
534 534 class DeprecatedMPILauncher(object):
535 535 def warn(self):
536 536 oldname = self.__class__.__name__
537 537 newname = oldname.replace('MPIExec', 'MPI')
538 538 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
539 539
540 540 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
541 541 """Deprecated, use MPILauncher"""
542 542 def __init__(self, *args, **kwargs):
543 543 super(MPIExecLauncher, self).__init__(*args, **kwargs)
544 544 self.warn()
545 545
546 546 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
547 547 """Deprecated, use MPIControllerLauncher"""
548 548 def __init__(self, *args, **kwargs):
549 549 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
550 550 self.warn()
551 551
552 552 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
553 553 """Deprecated, use MPIEngineSetLauncher"""
554 554 def __init__(self, *args, **kwargs):
555 555 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
556 556 self.warn()
557 557
558 558
559 559 #-----------------------------------------------------------------------------
560 560 # SSH launchers
561 561 #-----------------------------------------------------------------------------
562 562
563 563 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
564 564
565 565 class SSHLauncher(LocalProcessLauncher):
566 566 """A minimal launcher for ssh.
567 567
568 568 To be useful this will probably have to be extended to use the ``sshx``
569 569 idea for environment variables. There could be other things this needs
570 570 as well.
571 571 """
572 572
573 573 ssh_cmd = List(['ssh'], config=True,
574 574 help="command for starting ssh")
575 575 ssh_args = List(['-tt'], config=True,
576 576 help="args to pass to ssh")
577 577 scp_cmd = List(['scp'], config=True,
578 578 help="command for sending files")
579 579 program = List(['date'],
580 580 help="Program to launch via ssh")
581 581 program_args = List([],
582 582 help="args to pass to remote program")
583 583 hostname = Unicode('', config=True,
584 584 help="hostname on which to launch the program")
585 585 user = Unicode('', config=True,
586 586 help="username for ssh")
587 587 location = Unicode('', config=True,
588 588 help="user@hostname location for ssh in one setting")
589 589 to_fetch = List([], config=True,
590 590 help="List of (remote, local) files to fetch after starting")
591 591 to_send = List([], config=True,
592 592 help="List of (local, remote) files to send before starting")
593 593
594 594 def _hostname_changed(self, name, old, new):
595 595 if self.user:
596 596 self.location = u'%s@%s' % (self.user, new)
597 597 else:
598 598 self.location = new
599 599
600 600 def _user_changed(self, name, old, new):
601 601 self.location = u'%s@%s' % (new, self.hostname)
602 602
603 603 def find_args(self):
604 604 return self.ssh_cmd + self.ssh_args + [self.location] + \
605 605 list(map(pipes.quote, self.program + self.program_args))
606 606
607 607 def _send_file(self, local, remote):
608 608 """send a single file"""
609 609 remote = "%s:%s" % (self.location, remote)
610 610 for i in range(10):
611 611 if not os.path.exists(local):
612 612 self.log.debug("waiting for %s" % local)
613 613 time.sleep(1)
614 614 else:
615 615 break
616 616 self.log.info("sending %s to %s", local, remote)
617 617 check_output(self.scp_cmd + [local, remote])
618 618
619 619 def send_files(self):
620 620 """send our files (called before start)"""
621 621 if not self.to_send:
622 622 return
623 623 for local_file, remote_file in self.to_send:
624 624 self._send_file(local_file, remote_file)
625 625
626 626 def _fetch_file(self, remote, local):
627 627 """fetch a single file"""
628 628 full_remote = "%s:%s" % (self.location, remote)
629 629 self.log.info("fetching %s from %s", local, full_remote)
630 630 for i in range(10):
631 631 # wait up to 10s for remote file to exist
632 632 check = check_output(self.ssh_cmd + self.ssh_args + \
633 633 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
634 634 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
635 635 if check == u'no':
636 636 time.sleep(1)
637 637 elif check == u'yes':
638 638 break
639 639 check_output(self.scp_cmd + [full_remote, local])
640 640
641 641 def fetch_files(self):
642 642 """fetch remote files (called after start)"""
643 643 if not self.to_fetch:
644 644 return
645 645 for remote_file, local_file in self.to_fetch:
646 646 self._fetch_file(remote_file, local_file)
647 647
648 648 def start(self, hostname=None, user=None):
649 649 if hostname is not None:
650 650 self.hostname = hostname
651 651 if user is not None:
652 652 self.user = user
653 653
654 654 self.send_files()
655 655 super(SSHLauncher, self).start()
656 656 self.fetch_files()
657 657
658 658 def signal(self, sig):
659 659 if self.state == 'running':
660 660 # send escaped ssh connection-closer
661 661 self.process.stdin.write('~.')
662 662 self.process.stdin.flush()
663 663
664 664 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
665 665
666 666 remote_profile_dir = Unicode('', config=True,
667 667 help="""The remote profile_dir to use.
668 668
669 669 If not specified, use calling profile, stripping out possible leading homedir.
670 670 """)
671 671
672 672 def _profile_dir_changed(self, name, old, new):
673 673 if not self.remote_profile_dir:
674 674 # trigger remote_profile_dir_default logic again,
675 675 # in case it was already triggered before profile_dir was set
676 676 self.remote_profile_dir = self._strip_home(new)
677 677
678 678 @staticmethod
679 679 def _strip_home(path):
680 680 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
681 681 home = get_home_dir()
682 682 if not home.endswith('/'):
683 683 home = home+'/'
684 684
685 685 if path.startswith(home):
686 686 return path[len(home):]
687 687 else:
688 688 return path
689 689
690 690 def _remote_profile_dir_default(self):
691 691 return self._strip_home(self.profile_dir)
692 692
693 693 def _cluster_id_changed(self, name, old, new):
694 694 if new:
695 695 raise ValueError("cluster id not supported by SSH launchers")
696 696
697 697 @property
698 698 def cluster_args(self):
699 699 return ['--profile-dir', self.remote_profile_dir]
700 700
701 701 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
702 702
703 703 # alias back to *non-configurable* program[_args] for use in find_args()
704 704 # this way all Controller/EngineSetLaunchers have the same form, rather
705 705 # than *some* having `program_args` and others `controller_args`
706 706
707 707 def _controller_cmd_default(self):
708 708 return ['ipcontroller']
709 709
710 710 @property
711 711 def program(self):
712 712 return self.controller_cmd
713 713
714 714 @property
715 715 def program_args(self):
716 716 return self.cluster_args + self.controller_args
717 717
718 718 def _to_fetch_default(self):
719 719 return [
720 720 (os.path.join(self.remote_profile_dir, 'security', cf),
721 721 os.path.join(self.profile_dir, 'security', cf),)
722 722 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
723 723 ]
724 724
725 725 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
726 726
727 727 # alias back to *non-configurable* program[_args] for use in find_args()
728 728 # this way all Controller/EngineSetLaunchers have the same form, rather
729 729 # than *some* having `program_args` and others `controller_args`
730 730
731 731 def _engine_cmd_default(self):
732 732 return ['ipengine']
733 733
734 734 @property
735 735 def program(self):
736 736 return self.engine_cmd
737 737
738 738 @property
739 739 def program_args(self):
740 740 return self.cluster_args + self.engine_args
741 741
742 742 def _to_send_default(self):
743 743 return [
744 744 (os.path.join(self.profile_dir, 'security', cf),
745 745 os.path.join(self.remote_profile_dir, 'security', cf))
746 746 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
747 747 ]
748 748
749 749
750 750 class SSHEngineSetLauncher(LocalEngineSetLauncher):
751 751 launcher_class = SSHEngineLauncher
752 752 engines = Dict(config=True,
753 753 help="""dict of engines to launch. This is a dict by hostname of ints,
754 754 corresponding to the number of engines to start on that host.""")
755 755
756 756 def _engine_cmd_default(self):
757 757 return ['ipengine']
758 758
759 759 @property
760 760 def engine_count(self):
761 761 """determine engine count from `engines` dict"""
762 762 count = 0
763 763 for n in self.engines.itervalues():
764 764 if isinstance(n, (tuple,list)):
765 765 n,args = n
766 766 count += n
767 767 return count
768 768
769 769 def start(self, n):
770 770 """Start engines by profile or profile_dir.
771 771 `n` is ignored, and the `engines` config property is used instead.
772 772 """
773 773
774 774 dlist = []
775 775 for host, n in self.engines.iteritems():
776 776 if isinstance(n, (tuple, list)):
777 777 n, args = n
778 778 else:
779 779 args = copy.deepcopy(self.engine_args)
780 780
781 781 if '@' in host:
782 782 user,host = host.split('@',1)
783 783 else:
784 784 user=None
785 785 for i in range(n):
786 786 if i > 0:
787 787 time.sleep(self.delay)
788 788 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
789 789 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
790 790 )
791 791 if i > 0:
792 792 # only send files for the first engine on each host
793 793 el.to_send = []
794 794
795 795 # Copy the engine args over to each engine launcher.
796 796 el.engine_cmd = self.engine_cmd
797 797 el.engine_args = args
798 798 el.on_stop(self._notice_engine_stopped)
799 799 d = el.start(user=user, hostname=host)
800 800 self.launchers[ "%s/%i" % (host,i) ] = el
801 801 dlist.append(d)
802 802 self.notify_start(dlist)
803 803 return dlist
804 804
805 805
806 806 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
807 807 """Launcher for calling
808 808 `ipcluster engines` on a remote machine.
809 809
810 810 Requires that remote profile is already configured.
811 811 """
812 812
813 813 n = Integer()
814 814 ipcluster_cmd = List(['ipcluster'], config=True)
815 815
816 816 @property
817 817 def program(self):
818 818 return self.ipcluster_cmd + ['engines']
819 819
820 820 @property
821 821 def program_args(self):
822 822 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
823 823
824 824 def _to_send_default(self):
825 825 return [
826 826 (os.path.join(self.profile_dir, 'security', cf),
827 827 os.path.join(self.remote_profile_dir, 'security', cf))
828 828 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
829 829 ]
830 830
831 831 def start(self, n):
832 832 self.n = n
833 833 super(SSHProxyEngineSetLauncher, self).start()
834 834
835 835
836 836 #-----------------------------------------------------------------------------
837 837 # Windows HPC Server 2008 scheduler launchers
838 838 #-----------------------------------------------------------------------------
839 839
840 840
841 841 # This is only used on Windows.
842 842 def find_job_cmd():
843 843 if WINDOWS:
844 844 try:
845 845 return find_cmd('job')
846 846 except (FindCmdError, ImportError):
847 847 # ImportError will be raised if win32api is not installed
848 848 return 'job'
849 849 else:
850 850 return 'job'
851 851
852 852
853 853 class WindowsHPCLauncher(BaseLauncher):
854 854
855 855 job_id_regexp = CRegExp(r'\d+', config=True,
856 856 help="""A regular expression used to get the job id from the output of the
857 857 submit_command. """
858 858 )
859 859 job_file_name = Unicode(u'ipython_job.xml', config=True,
860 860 help="The filename of the instantiated job script.")
861 861 # The full path to the instantiated job script. This gets made dynamically
862 862 # by combining the work_dir with the job_file_name.
863 863 job_file = Unicode(u'')
864 864 scheduler = Unicode('', config=True,
865 865 help="The hostname of the scheduler to submit the job to.")
866 866 job_cmd = Unicode(find_job_cmd(), config=True,
867 867 help="The command for submitting jobs.")
868 868
869 869 def __init__(self, work_dir=u'.', config=None, **kwargs):
870 870 super(WindowsHPCLauncher, self).__init__(
871 871 work_dir=work_dir, config=config, **kwargs
872 872 )
873 873
874 874 @property
875 875 def job_file(self):
876 876 return os.path.join(self.work_dir, self.job_file_name)
877 877
878 878 def write_job_file(self, n):
879 879 raise NotImplementedError("Implement write_job_file in a subclass.")
880 880
881 881 def find_args(self):
882 882 return [u'job.exe']
883 883
884 884 def parse_job_id(self, output):
885 885 """Take the output of the submit command and return the job id."""
886 886 m = self.job_id_regexp.search(output)
887 887 if m is not None:
888 888 job_id = m.group()
889 889 else:
890 890 raise LauncherError("Job id couldn't be determined: %s" % output)
891 891 self.job_id = job_id
892 892 self.log.info('Job started with id: %r', job_id)
893 893 return job_id
894 894
895 895 def start(self, n):
896 896 """Start n copies of the process using the Win HPC job scheduler."""
897 897 self.write_job_file(n)
898 898 args = [
899 899 'submit',
900 900 '/jobfile:%s' % self.job_file,
901 901 '/scheduler:%s' % self.scheduler
902 902 ]
903 903 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
904 904
905 905 output = check_output([self.job_cmd]+args,
906 906 env=os.environ,
907 907 cwd=self.work_dir,
908 908 stderr=STDOUT
909 909 )
910 910 output = output.decode(DEFAULT_ENCODING, 'replace')
911 911 job_id = self.parse_job_id(output)
912 912 self.notify_start(job_id)
913 913 return job_id
914 914
915 915 def stop(self):
916 916 args = [
917 917 'cancel',
918 918 self.job_id,
919 919 '/scheduler:%s' % self.scheduler
920 920 ]
921 921 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
922 922 try:
923 923 output = check_output([self.job_cmd]+args,
924 924 env=os.environ,
925 925 cwd=self.work_dir,
926 926 stderr=STDOUT
927 927 )
928 928 output = output.decode(DEFAULT_ENCODING, 'replace')
929 929 except:
930 930 output = u'The job already appears to be stopped: %r' % self.job_id
931 931 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
932 932 return output
933 933
934 934
935 935 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
936 936
937 937 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
938 938 help="WinHPC xml job file.")
939 939 controller_args = List([], config=False,
940 940 help="extra args to pass to ipcontroller")
941 941
942 942 def write_job_file(self, n):
943 943 job = IPControllerJob(config=self.config)
944 944
945 945 t = IPControllerTask(config=self.config)
946 946 # The tasks work directory is *not* the actual work directory of
947 947 # the controller. It is used as the base path for the stdout/stderr
948 948 # files that the scheduler redirects to.
949 949 t.work_directory = self.profile_dir
950 950 # Add the profile_dir and from self.start().
951 951 t.controller_args.extend(self.cluster_args)
952 952 t.controller_args.extend(self.controller_args)
953 953 job.add_task(t)
954 954
955 955 self.log.debug("Writing job description file: %s", self.job_file)
956 956 job.write(self.job_file)
957 957
958 958 @property
959 959 def job_file(self):
960 960 return os.path.join(self.profile_dir, self.job_file_name)
961 961
962 962 def start(self):
963 963 """Start the controller by profile_dir."""
964 964 return super(WindowsHPCControllerLauncher, self).start(1)
965 965
966 966
967 967 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
968 968
969 969 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
970 970 help="jobfile for ipengines job")
971 971 engine_args = List([], config=False,
972 972 help="extra args to pas to ipengine")
973 973
974 974 def write_job_file(self, n):
975 975 job = IPEngineSetJob(config=self.config)
976 976
977 977 for i in range(n):
978 978 t = IPEngineTask(config=self.config)
979 979 # The tasks work directory is *not* the actual work directory of
980 980 # the engine. It is used as the base path for the stdout/stderr
981 981 # files that the scheduler redirects to.
982 982 t.work_directory = self.profile_dir
983 983 # Add the profile_dir and from self.start().
984 984 t.engine_args.extend(self.cluster_args)
985 985 t.engine_args.extend(self.engine_args)
986 986 job.add_task(t)
987 987
988 988 self.log.debug("Writing job description file: %s", self.job_file)
989 989 job.write(self.job_file)
990 990
991 991 @property
992 992 def job_file(self):
993 993 return os.path.join(self.profile_dir, self.job_file_name)
994 994
995 995 def start(self, n):
996 996 """Start the controller by profile_dir."""
997 997 return super(WindowsHPCEngineSetLauncher, self).start(n)
998 998
999 999
1000 1000 #-----------------------------------------------------------------------------
1001 1001 # Batch (PBS) system launchers
1002 1002 #-----------------------------------------------------------------------------
1003 1003
1004 1004 class BatchClusterAppMixin(ClusterAppMixin):
1005 1005 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1006 1006 def _profile_dir_changed(self, name, old, new):
1007 1007 self.context[name] = new
1008 1008 _cluster_id_changed = _profile_dir_changed
1009 1009
1010 1010 def _profile_dir_default(self):
1011 1011 self.context['profile_dir'] = ''
1012 1012 return ''
1013 1013 def _cluster_id_default(self):
1014 1014 self.context['cluster_id'] = ''
1015 1015 return ''
1016 1016
1017 1017
1018 1018 class BatchSystemLauncher(BaseLauncher):
1019 1019 """Launch an external process using a batch system.
1020 1020
1021 1021 This class is designed to work with UNIX batch systems like PBS, LSF,
1022 1022 GridEngine, etc. The overall model is that there are different commands
1023 1023 like qsub, qdel, etc. that handle the starting and stopping of the process.
1024 1024
1025 1025 This class also has the notion of a batch script. The ``batch_template``
1026 1026 attribute can be set to a string that is a template for the batch script.
1027 1027 This template is instantiated using string formatting. Thus the template can
1028 1028 use {n} fot the number of instances. Subclasses can add additional variables
1029 1029 to the template dict.
1030 1030 """
1031 1031
1032 1032 # Subclasses must fill these in. See PBSEngineSet
1033 1033 submit_command = List([''], config=True,
1034 1034 help="The name of the command line program used to submit jobs.")
1035 1035 delete_command = List([''], config=True,
1036 1036 help="The name of the command line program used to delete jobs.")
1037 1037 job_id_regexp = CRegExp('', config=True,
1038 1038 help="""A regular expression used to get the job id from the output of the
1039 1039 submit_command.""")
1040 1040 job_id_regexp_group = Integer(0, config=True,
1041 1041 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1042 1042 batch_template = Unicode('', config=True,
1043 1043 help="The string that is the batch script template itself.")
1044 1044 batch_template_file = Unicode(u'', config=True,
1045 1045 help="The file that contains the batch template.")
1046 1046 batch_file_name = Unicode(u'batch_script', config=True,
1047 1047 help="The filename of the instantiated batch script.")
1048 1048 queue = Unicode(u'', config=True,
1049 1049 help="The PBS Queue.")
1050 1050
1051 1051 def _queue_changed(self, name, old, new):
1052 1052 self.context[name] = new
1053 1053
1054 1054 n = Integer(1)
1055 1055 _n_changed = _queue_changed
1056 1056
1057 1057 # not configurable, override in subclasses
1058 1058 # PBS Job Array regex
1059 1059 job_array_regexp = CRegExp('')
1060 1060 job_array_template = Unicode('')
1061 1061 # PBS Queue regex
1062 1062 queue_regexp = CRegExp('')
1063 1063 queue_template = Unicode('')
1064 1064 # The default batch template, override in subclasses
1065 1065 default_template = Unicode('')
1066 1066 # The full path to the instantiated batch script.
1067 1067 batch_file = Unicode(u'')
1068 1068 # the format dict used with batch_template:
1069 1069 context = Dict()
1070 1070
1071 1071 def _context_default(self):
1072 1072 """load the default context with the default values for the basic keys
1073 1073
1074 1074 because the _trait_changed methods only load the context if they
1075 1075 are set to something other than the default value.
1076 1076 """
1077 1077 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1078 1078
1079 1079 # the Formatter instance for rendering the templates:
1080 1080 formatter = Instance(EvalFormatter, (), {})
1081 1081
1082 1082 def find_args(self):
1083 1083 return self.submit_command + [self.batch_file]
1084 1084
1085 1085 def __init__(self, work_dir=u'.', config=None, **kwargs):
1086 1086 super(BatchSystemLauncher, self).__init__(
1087 1087 work_dir=work_dir, config=config, **kwargs
1088 1088 )
1089 1089 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1090 1090
1091 1091 def parse_job_id(self, output):
1092 1092 """Take the output of the submit command and return the job id."""
1093 1093 m = self.job_id_regexp.search(output)
1094 1094 if m is not None:
1095 1095 job_id = m.group(self.job_id_regexp_group)
1096 1096 else:
1097 1097 raise LauncherError("Job id couldn't be determined: %s" % output)
1098 1098 self.job_id = job_id
1099 1099 self.log.info('Job submitted with job id: %r', job_id)
1100 1100 return job_id
1101 1101
1102 1102 def write_batch_script(self, n):
1103 1103 """Instantiate and write the batch script to the work_dir."""
1104 1104 self.n = n
1105 1105 # first priority is batch_template if set
1106 1106 if self.batch_template_file and not self.batch_template:
1107 1107 # second priority is batch_template_file
1108 1108 with open(self.batch_template_file) as f:
1109 1109 self.batch_template = f.read()
1110 1110 if not self.batch_template:
1111 1111 # third (last) priority is default_template
1112 1112 self.batch_template = self.default_template
1113 1113 # add jobarray or queue lines to user-specified template
1114 1114 # note that this is *only* when user did not specify a template.
1115 1115 self._insert_queue_in_script()
1116 1116 self._insert_job_array_in_script()
1117 1117 script_as_string = self.formatter.format(self.batch_template, **self.context)
1118 1118 self.log.debug('Writing batch script: %s', self.batch_file)
1119 1119 with open(self.batch_file, 'w') as f:
1120 1120 f.write(script_as_string)
1121 1121 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1122 1122
1123 1123 def _insert_queue_in_script(self):
1124 1124 """Inserts a queue if required into the batch script.
1125 1125 """
1126 1126 if self.queue and not self.queue_regexp.search(self.batch_template):
1127 1127 self.log.debug("adding PBS queue settings to batch script")
1128 1128 firstline, rest = self.batch_template.split('\n',1)
1129 1129 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1130 1130
1131 1131 def _insert_job_array_in_script(self):
1132 1132 """Inserts a job array if required into the batch script.
1133 1133 """
1134 1134 if not self.job_array_regexp.search(self.batch_template):
1135 1135 self.log.debug("adding job array settings to batch script")
1136 1136 firstline, rest = self.batch_template.split('\n',1)
1137 1137 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1138 1138
1139 1139 def start(self, n):
1140 1140 """Start n copies of the process using a batch system."""
1141 1141 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1142 1142 # Here we save profile_dir in the context so they
1143 1143 # can be used in the batch script template as {profile_dir}
1144 1144 self.write_batch_script(n)
1145 1145 output = check_output(self.args, env=os.environ)
1146 1146 output = output.decode(DEFAULT_ENCODING, 'replace')
1147 1147
1148 1148 job_id = self.parse_job_id(output)
1149 1149 self.notify_start(job_id)
1150 1150 return job_id
1151 1151
1152 1152 def stop(self):
1153 1153 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1154 1154 output = output.decode(DEFAULT_ENCODING, 'replace')
1155 1155 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1156 1156 return output
1157 1157
1158 1158
1159 1159 class PBSLauncher(BatchSystemLauncher):
1160 1160 """A BatchSystemLauncher subclass for PBS."""
1161 1161
1162 1162 submit_command = List(['qsub'], config=True,
1163 1163 help="The PBS submit command ['qsub']")
1164 1164 delete_command = List(['qdel'], config=True,
1165 1165 help="The PBS delete command ['qsub']")
1166 1166 job_id_regexp = CRegExp(r'\d+', config=True,
1167 1167 help="Regular expresion for identifying the job ID [r'\d+']")
1168 1168
1169 1169 batch_file = Unicode(u'')
1170 1170 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1171 1171 job_array_template = Unicode('#PBS -t 1-{n}')
1172 1172 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1173 1173 queue_template = Unicode('#PBS -q {queue}')
1174 1174
1175 1175
1176 1176 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1177 1177 """Launch a controller using PBS."""
1178 1178
1179 1179 batch_file_name = Unicode(u'pbs_controller', config=True,
1180 1180 help="batch file name for the controller job.")
1181 1181 default_template= Unicode("""#!/bin/sh
1182 1182 #PBS -V
1183 1183 #PBS -N ipcontroller
1184 1184 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1185 1185 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1186 1186
1187
1188 1187 def start(self):
1189 1188 """Start the controller by profile or profile_dir."""
1190 1189 return super(PBSControllerLauncher, self).start(1)
1191 1190
1192 1191
1193 1192 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1194 1193 """Launch Engines using PBS"""
1195 1194 batch_file_name = Unicode(u'pbs_engines', config=True,
1196 1195 help="batch file name for the engine(s) job.")
1197 1196 default_template= Unicode(u"""#!/bin/sh
1198 1197 #PBS -V
1199 1198 #PBS -N ipengine
1200 1199 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1201 1200 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1202 1201
1203 def start(self, n):
1204 """Start n engines by profile or profile_dir."""
1205 return super(PBSEngineSetLauncher, self).start(n)
1206
1207 1202
1208 1203 #SGE is very similar to PBS
1209 1204
1210 1205 class SGELauncher(PBSLauncher):
1211 1206 """Sun GridEngine is a PBS clone with slightly different syntax"""
1212 1207 job_array_regexp = CRegExp('#\$\W+\-t')
1213 1208 job_array_template = Unicode('#$ -t 1-{n}')
1214 1209 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1215 1210 queue_template = Unicode('#$ -q {queue}')
1216 1211
1217 1212
1218 1213 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1219 1214 """Launch a controller using SGE."""
1220 1215
1221 1216 batch_file_name = Unicode(u'sge_controller', config=True,
1222 1217 help="batch file name for the ipontroller job.")
1223 1218 default_template= Unicode(u"""#$ -V
1224 1219 #$ -S /bin/sh
1225 1220 #$ -N ipcontroller
1226 1221 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1227 1222 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1228 1223
1229 1224 def start(self):
1230 1225 """Start the controller by profile or profile_dir."""
1231 1226 return super(SGEControllerLauncher, self).start(1)
1232 1227
1233 1228
1234 1229 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1235 1230 """Launch Engines with SGE"""
1236 1231 batch_file_name = Unicode(u'sge_engines', config=True,
1237 1232 help="batch file name for the engine(s) job.")
1238 1233 default_template = Unicode("""#$ -V
1239 1234 #$ -S /bin/sh
1240 1235 #$ -N ipengine
1241 1236 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1242 1237 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1243 1238
1244 def start(self, n):
1245 """Start n engines by profile or profile_dir."""
1246 return super(SGEEngineSetLauncher, self).start(n)
1247
1248 1239
1249 1240 # LSF launchers
1250 1241
1251 1242 class LSFLauncher(BatchSystemLauncher):
1252 1243 """A BatchSystemLauncher subclass for LSF."""
1253 1244
1254 1245 submit_command = List(['bsub'], config=True,
1255 1246 help="The PBS submit command ['bsub']")
1256 1247 delete_command = List(['bkill'], config=True,
1257 1248 help="The PBS delete command ['bkill']")
1258 1249 job_id_regexp = CRegExp(r'\d+', config=True,
1259 1250 help="Regular expresion for identifying the job ID [r'\d+']")
1260 1251
1261 1252 batch_file = Unicode(u'')
1262 1253 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1263 1254 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1264 1255 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1265 1256 queue_template = Unicode('#BSUB -q {queue}')
1266 1257
1267 1258 def start(self, n):
1268 1259 """Start n copies of the process using LSF batch system.
1269 1260 This cant inherit from the base class because bsub expects
1270 1261 to be piped a shell script in order to honor the #BSUB directives :
1271 1262 bsub < script
1272 1263 """
1273 1264 # Here we save profile_dir in the context so they
1274 1265 # can be used in the batch script template as {profile_dir}
1275 1266 self.write_batch_script(n)
1276 1267 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1277 1268 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1278 1269 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1279 1270 output,err = p.communicate()
1280 1271 output = output.decode(DEFAULT_ENCODING, 'replace')
1281 1272 job_id = self.parse_job_id(output)
1282 1273 self.notify_start(job_id)
1283 1274 return job_id
1284 1275
1285 1276
1286 1277 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1287 1278 """Launch a controller using LSF."""
1288 1279
1289 1280 batch_file_name = Unicode(u'lsf_controller', config=True,
1290 1281 help="batch file name for the controller job.")
1291 1282 default_template= Unicode("""#!/bin/sh
1292 1283 #BSUB -J ipcontroller
1293 1284 #BSUB -oo ipcontroller.o.%%J
1294 1285 #BSUB -eo ipcontroller.e.%%J
1295 1286 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1296 1287 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1297 1288
1298 1289 def start(self):
1299 1290 """Start the controller by profile or profile_dir."""
1300 1291 return super(LSFControllerLauncher, self).start(1)
1301 1292
1302 1293
1303 1294 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1304 1295 """Launch Engines using LSF"""
1305 1296 batch_file_name = Unicode(u'lsf_engines', config=True,
1306 1297 help="batch file name for the engine(s) job.")
1307 1298 default_template= Unicode(u"""#!/bin/sh
1308 1299 #BSUB -oo ipengine.o.%%J
1309 1300 #BSUB -eo ipengine.e.%%J
1310 1301 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1311 1302 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1312 1303
1313 def start(self, n):
1314 """Start n engines by profile or profile_dir."""
1315 return super(LSFEngineSetLauncher, self).start(n)
1316
1317 1304
1318 1305 # Condor Requires that we launch the ipengine/ipcontroller scripts rather
1319 1306 # that the python instance but otherwise is very similar to PBS
1320 1307
1321 1308 class CondorLauncher(BatchSystemLauncher):
1322 1309 """A BatchSystemLauncher subclass for Condor."""
1323 1310
1324 1311 submit_command = List(['condor_submit'], config=True,
1325 1312 help="The Condor submit command ['condor_submit']")
1326 1313 delete_command = List(['condor_rm'], config=True,
1327 1314 help="The Condor delete command ['condor_rm']")
1328 1315 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1329 1316 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1330 1317 job_id_regexp_group = Integer(1, config=True,
1331 1318 help="""The group we wish to match in job_id_regexp [1]""")
1332 1319
1333 1320 job_array_regexp = CRegExp('queue\W+\$')
1334 1321 job_array_template = Unicode('queue {n}')
1335 1322
1336 1323
1337 1324 def _insert_job_array_in_script(self):
1338 1325 """Inserts a job array if required into the batch script.
1339 1326 """
1340 1327 if not self.job_array_regexp.search(self.batch_template):
1341 1328 self.log.debug("adding job array settings to batch script")
1342 1329 #Condor requires that the job array goes at the bottom of the script
1343 1330 self.batch_template = '\n'.join([self.batch_template,
1344 1331 self.job_array_template])
1345 1332
1346 1333 def _insert_queue_in_script(self):
1347 1334 """AFAIK, Condor doesn't have a concept of multiple queues that can be
1348 1335 specified in the script..
1349 1336 """
1350 1337 pass
1351 1338
1352 1339
1353 1340 class CondorControllerLauncher(CondorLauncher, BatchClusterAppMixin):
1354 1341 """Launch a controller using Condor."""
1355 1342
1356 1343 batch_file_name = Unicode(u'condor_controller', config=True,
1357 1344 help="batch file name for the controller job.")
1358 1345 default_template = Unicode(r"""
1359 1346 universe = vanilla
1360 1347 executable = %s
1361 1348 # by default we expect a shared file system
1362 1349 transfer_executable = False
1363 1350 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1364 1351 """ % condor_ipcontroller_cmd_argv)
1365 1352
1366 1353 def start(self):
1367 1354 """Start the controller by profile or profile_dir."""
1368 1355 return super(CondorControllerLauncher, self).start(1)
1369 1356
1370 1357
1371 1358 class CondorEngineSetLauncher(CondorLauncher, BatchClusterAppMixin):
1372 1359 """Launch Engines using Condor"""
1373 1360 batch_file_name = Unicode(u'condor_engines', config=True,
1374 1361 help="batch file name for the engine(s) job.")
1375 1362 default_template = Unicode("""
1376 1363 universe = vanilla
1377 1364 executable = %s
1378 1365 # by default we expect a shared file system
1379 1366 transfer_executable = False
1380 1367 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1381 1368 """ % condor_ipengine_cmd_argv)
1382 1369
1383 def start(self, n):
1384 """Start n engines by profile or profile_dir."""
1385 return super(CondorEngineSetLauncher, self).start(n)
1386
1387 1370
1388 1371 #-----------------------------------------------------------------------------
1389 1372 # A launcher for ipcluster itself!
1390 1373 #-----------------------------------------------------------------------------
1391 1374
1392 1375
1393 1376 class IPClusterLauncher(LocalProcessLauncher):
1394 1377 """Launch the ipcluster program in an external process."""
1395 1378
1396 1379 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1397 1380 help="Popen command for ipcluster")
1398 1381 ipcluster_args = List(
1399 1382 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1400 1383 help="Command line arguments to pass to ipcluster.")
1401 1384 ipcluster_subcommand = Unicode('start')
1402 1385 profile = Unicode('default')
1403 1386 n = Integer(2)
1404 1387
1405 1388 def find_args(self):
1406 1389 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1407 1390 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1408 1391 self.ipcluster_args
1409 1392
1410 1393 def start(self):
1411 1394 return super(IPClusterLauncher, self).start()
1412 1395
1413 1396 #-----------------------------------------------------------------------------
1414 1397 # Collections of launchers
1415 1398 #-----------------------------------------------------------------------------
1416 1399
1417 1400 local_launchers = [
1418 1401 LocalControllerLauncher,
1419 1402 LocalEngineLauncher,
1420 1403 LocalEngineSetLauncher,
1421 1404 ]
1422 1405 mpi_launchers = [
1423 1406 MPILauncher,
1424 1407 MPIControllerLauncher,
1425 1408 MPIEngineSetLauncher,
1426 1409 ]
1427 1410 ssh_launchers = [
1428 1411 SSHLauncher,
1429 1412 SSHControllerLauncher,
1430 1413 SSHEngineLauncher,
1431 1414 SSHEngineSetLauncher,
1432 1415 SSHProxyEngineSetLauncher,
1433 1416 ]
1434 1417 winhpc_launchers = [
1435 1418 WindowsHPCLauncher,
1436 1419 WindowsHPCControllerLauncher,
1437 1420 WindowsHPCEngineSetLauncher,
1438 1421 ]
1439 1422 pbs_launchers = [
1440 1423 PBSLauncher,
1441 1424 PBSControllerLauncher,
1442 1425 PBSEngineSetLauncher,
1443 1426 ]
1444 1427 sge_launchers = [
1445 1428 SGELauncher,
1446 1429 SGEControllerLauncher,
1447 1430 SGEEngineSetLauncher,
1448 1431 ]
1449 1432 lsf_launchers = [
1450 1433 LSFLauncher,
1451 1434 LSFControllerLauncher,
1452 1435 LSFEngineSetLauncher,
1453 1436 ]
1454 1437 condor_launchers = [
1455 1438 CondorLauncher,
1456 1439 CondorControllerLauncher,
1457 1440 CondorEngineSetLauncher,
1458 1441 ]
1459 1442 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1460 1443 + pbs_launchers + sge_launchers + lsf_launchers + condor_launchers
General Comments 0
You need to be logged in to leave comments. Login now