##// END OF EJS Templates
remove unnecessary initial assignments in launchers...
Min RK -
Show More
@@ -1,1448 +1,1445 b''
1 1 # encoding: utf-8
2 2 """Facilities for launching IPython processes asynchronously."""
3 3
4 4 # Copyright (c) IPython Development Team.
5 5 # Distributed under the terms of the Modified BSD License.
6 6
7 7 import copy
8 8 import logging
9 9 import os
10 10 import pipes
11 11 import stat
12 12 import sys
13 13 import time
14 14
15 15 # signal imports, handling various platforms, versions
16 16
17 17 from signal import SIGINT, SIGTERM
18 18 try:
19 19 from signal import SIGKILL
20 20 except ImportError:
21 21 # Windows
22 22 SIGKILL=SIGTERM
23 23
24 24 try:
25 25 # Windows >= 2.7, 3.2
26 26 from signal import CTRL_C_EVENT as SIGINT
27 27 except ImportError:
28 28 pass
29 29
30 30 from subprocess import Popen, PIPE, STDOUT
31 31 try:
32 32 from subprocess import check_output
33 33 except ImportError:
34 34 # pre-2.7, define check_output with Popen
35 35 def check_output(*args, **kwargs):
36 36 kwargs.update(dict(stdout=PIPE))
37 37 p = Popen(*args, **kwargs)
38 38 out,err = p.communicate()
39 39 return out
40 40
41 41 from zmq.eventloop import ioloop
42 42
43 43 from IPython.config.application import Application
44 44 from IPython.config.configurable import LoggingConfigurable
45 45 from IPython.utils.text import EvalFormatter
46 46 from IPython.utils.traitlets import (
47 47 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
48 48 )
49 49 from IPython.utils.encoding import DEFAULT_ENCODING
50 50 from IPython.utils.path import get_home_dir, ensure_dir_exists
51 51 from IPython.utils.process import find_cmd, FindCmdError
52 52 from IPython.utils.py3compat import iteritems, itervalues
53 53
54 54 from .win32support import forward_read_events
55 55
56 56 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
57 57
58 58 WINDOWS = os.name == 'nt'
59 59
60 60 #-----------------------------------------------------------------------------
61 61 # Paths to the kernel apps
62 62 #-----------------------------------------------------------------------------
63 63
64 64 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
65 65
66 66 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
67 67
68 68 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
69 69
70 70 if WINDOWS and sys.version_info < (3,):
71 71 # `python -m package` doesn't work on Windows Python 2
72 72 # due to weird multiprocessing bugs
73 73 # and python -m module puts classes in the `__main__` module,
74 74 # so instance checks get confused
75 75 ipengine_cmd_argv = [sys.executable, "-c", "from IPython.parallel.engine.__main__ import main; main()"]
76 76 ipcontroller_cmd_argv = [sys.executable, "-c", "from IPython.parallel.controller.__main__ import main; main()"]
77 77
78 78 #-----------------------------------------------------------------------------
79 79 # Base launchers and errors
80 80 #-----------------------------------------------------------------------------
81 81
82 82 class LauncherError(Exception):
83 83 pass
84 84
85 85
86 86 class ProcessStateError(LauncherError):
87 87 pass
88 88
89 89
90 90 class UnknownStatus(LauncherError):
91 91 pass
92 92
93 93
94 94 class BaseLauncher(LoggingConfigurable):
95 95 """An asbtraction for starting, stopping and signaling a process."""
96 96
97 97 # In all of the launchers, the work_dir is where child processes will be
98 98 # run. This will usually be the profile_dir, but may not be. any work_dir
99 99 # passed into the __init__ method will override the config value.
100 100 # This should not be used to set the work_dir for the actual engine
101 101 # and controller. Instead, use their own config files or the
102 102 # controller_args, engine_args attributes of the launchers to add
103 103 # the work_dir option.
104 104 work_dir = Unicode(u'.')
105 105 loop = Instance('zmq.eventloop.ioloop.IOLoop')
106 106
107 107 start_data = Any()
108 108 stop_data = Any()
109 109
110 110 def _loop_default(self):
111 111 return ioloop.IOLoop.instance()
112 112
113 113 def __init__(self, work_dir=u'.', config=None, **kwargs):
114 114 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
115 115 self.state = 'before' # can be before, running, after
116 116 self.stop_callbacks = []
117 self.start_data = None
118 self.stop_data = None
119 117
120 118 @property
121 119 def args(self):
122 120 """A list of cmd and args that will be used to start the process.
123 121
124 122 This is what is passed to :func:`spawnProcess` and the first element
125 123 will be the process name.
126 124 """
127 125 return self.find_args()
128 126
129 127 def find_args(self):
130 128 """The ``.args`` property calls this to find the args list.
131 129
132 130 Subcommand should implement this to construct the cmd and args.
133 131 """
134 132 raise NotImplementedError('find_args must be implemented in a subclass')
135 133
136 134 @property
137 135 def arg_str(self):
138 136 """The string form of the program arguments."""
139 137 return ' '.join(self.args)
140 138
141 139 @property
142 140 def running(self):
143 141 """Am I running."""
144 142 if self.state == 'running':
145 143 return True
146 144 else:
147 145 return False
148 146
149 147 def start(self):
150 148 """Start the process."""
151 149 raise NotImplementedError('start must be implemented in a subclass')
152 150
153 151 def stop(self):
154 152 """Stop the process and notify observers of stopping.
155 153
156 154 This method will return None immediately.
157 155 To observe the actual process stopping, see :meth:`on_stop`.
158 156 """
159 157 raise NotImplementedError('stop must be implemented in a subclass')
160 158
161 159 def on_stop(self, f):
162 160 """Register a callback to be called with this Launcher's stop_data
163 161 when the process actually finishes.
164 162 """
165 163 if self.state=='after':
166 164 return f(self.stop_data)
167 165 else:
168 166 self.stop_callbacks.append(f)
169 167
170 168 def notify_start(self, data):
171 169 """Call this to trigger startup actions.
172 170
173 171 This logs the process startup and sets the state to 'running'. It is
174 172 a pass-through so it can be used as a callback.
175 173 """
176 174
177 175 self.log.debug('Process %r started: %r', self.args[0], data)
178 176 self.start_data = data
179 177 self.state = 'running'
180 178 return data
181 179
182 180 def notify_stop(self, data):
183 181 """Call this to trigger process stop actions.
184 182
185 183 This logs the process stopping and sets the state to 'after'. Call
186 184 this to trigger callbacks registered via :meth:`on_stop`."""
187 185
188 186 self.log.debug('Process %r stopped: %r', self.args[0], data)
189 187 self.stop_data = data
190 188 self.state = 'after'
191 189 for i in range(len(self.stop_callbacks)):
192 190 d = self.stop_callbacks.pop()
193 191 d(data)
194 192 return data
195 193
196 194 def signal(self, sig):
197 195 """Signal the process.
198 196
199 197 Parameters
200 198 ----------
201 199 sig : str or int
202 200 'KILL', 'INT', etc., or any signal number
203 201 """
204 202 raise NotImplementedError('signal must be implemented in a subclass')
205 203
206 204 class ClusterAppMixin(HasTraits):
207 205 """MixIn for cluster args as traits"""
208 206 profile_dir=Unicode('')
209 207 cluster_id=Unicode('')
210 208
211 209 @property
212 210 def cluster_args(self):
213 211 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
214 212
215 213 class ControllerMixin(ClusterAppMixin):
216 214 controller_cmd = List(ipcontroller_cmd_argv, config=True,
217 215 help="""Popen command to launch ipcontroller.""")
218 216 # Command line arguments to ipcontroller.
219 217 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
220 218 help="""command-line args to pass to ipcontroller""")
221 219
222 220 class EngineMixin(ClusterAppMixin):
223 221 engine_cmd = List(ipengine_cmd_argv, config=True,
224 222 help="""command to launch the Engine.""")
225 223 # Command line arguments for ipengine.
226 224 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
227 225 help="command-line arguments to pass to ipengine"
228 226 )
229 227
230 228
231 229 #-----------------------------------------------------------------------------
232 230 # Local process launchers
233 231 #-----------------------------------------------------------------------------
234 232
235 233
236 234 class LocalProcessLauncher(BaseLauncher):
237 235 """Start and stop an external process in an asynchronous manner.
238 236
239 237 This will launch the external process with a working directory of
240 238 ``self.work_dir``.
241 239 """
242 240
243 241 # This is used to to construct self.args, which is passed to
244 242 # spawnProcess.
245 243 cmd_and_args = List([])
246 244 poll_frequency = Integer(100) # in ms
247 245
248 246 def __init__(self, work_dir=u'.', config=None, **kwargs):
249 247 super(LocalProcessLauncher, self).__init__(
250 248 work_dir=work_dir, config=config, **kwargs
251 249 )
252 250 self.process = None
253 251 self.poller = None
254 252
255 253 def find_args(self):
256 254 return self.cmd_and_args
257 255
258 256 def start(self):
259 257 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
260 258 if self.state == 'before':
261 259 self.process = Popen(self.args,
262 260 stdout=PIPE,stderr=PIPE,stdin=PIPE,
263 261 env=os.environ,
264 262 cwd=self.work_dir
265 263 )
266 264 if WINDOWS:
267 265 self.stdout = forward_read_events(self.process.stdout)
268 266 self.stderr = forward_read_events(self.process.stderr)
269 267 else:
270 268 self.stdout = self.process.stdout.fileno()
271 269 self.stderr = self.process.stderr.fileno()
272 270 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
273 271 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
274 272 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
275 273 self.poller.start()
276 274 self.notify_start(self.process.pid)
277 275 else:
278 276 s = 'The process was already started and has state: %r' % self.state
279 277 raise ProcessStateError(s)
280 278
281 279 def stop(self):
282 280 return self.interrupt_then_kill()
283 281
284 282 def signal(self, sig):
285 283 if self.state == 'running':
286 284 if WINDOWS and sig != SIGINT:
287 285 # use Windows tree-kill for better child cleanup
288 286 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
289 287 else:
290 288 self.process.send_signal(sig)
291 289
292 290 def interrupt_then_kill(self, delay=2.0):
293 291 """Send INT, wait a delay and then send KILL."""
294 292 try:
295 293 self.signal(SIGINT)
296 294 except Exception:
297 295 self.log.debug("interrupt failed")
298 296 pass
299 297 self.killer = self.loop.add_timeout(self.loop.time() + delay, lambda : self.signal(SIGKILL))
300 298
301 299 # callbacks, etc:
302 300
303 301 def handle_stdout(self, fd, events):
304 302 if WINDOWS:
305 303 line = self.stdout.recv()
306 304 else:
307 305 line = self.process.stdout.readline()
308 306 # a stopped process will be readable but return empty strings
309 307 if line:
310 308 self.log.debug(line[:-1])
311 309 else:
312 310 self.poll()
313 311
314 312 def handle_stderr(self, fd, events):
315 313 if WINDOWS:
316 314 line = self.stderr.recv()
317 315 else:
318 316 line = self.process.stderr.readline()
319 317 # a stopped process will be readable but return empty strings
320 318 if line:
321 319 self.log.debug(line[:-1])
322 320 else:
323 321 self.poll()
324 322
325 323 def poll(self):
326 324 status = self.process.poll()
327 325 if status is not None:
328 326 self.poller.stop()
329 327 self.loop.remove_handler(self.stdout)
330 328 self.loop.remove_handler(self.stderr)
331 329 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
332 330 return status
333 331
334 332 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
335 333 """Launch a controller as a regular external process."""
336 334
337 335 def find_args(self):
338 336 return self.controller_cmd + self.cluster_args + self.controller_args
339 337
340 338 def start(self):
341 339 """Start the controller by profile_dir."""
342 340 return super(LocalControllerLauncher, self).start()
343 341
344 342
345 343 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
346 344 """Launch a single engine as a regular externall process."""
347 345
348 346 def find_args(self):
349 347 return self.engine_cmd + self.cluster_args + self.engine_args
350 348
351 349
352 350 class LocalEngineSetLauncher(LocalEngineLauncher):
353 351 """Launch a set of engines as regular external processes."""
354 352
355 353 delay = CFloat(0.1, config=True,
356 354 help="""delay (in seconds) between starting each engine after the first.
357 355 This can help force the engines to get their ids in order, or limit
358 356 process flood when starting many engines."""
359 357 )
360 358
361 359 # launcher class
362 360 launcher_class = LocalEngineLauncher
363 361
364 362 launchers = Dict()
365 363 stop_data = Dict()
366 364
367 365 def __init__(self, work_dir=u'.', config=None, **kwargs):
368 366 super(LocalEngineSetLauncher, self).__init__(
369 367 work_dir=work_dir, config=config, **kwargs
370 368 )
371 self.stop_data = {}
372 369
373 370 def start(self, n):
374 371 """Start n engines by profile or profile_dir."""
375 372 dlist = []
376 373 for i in range(n):
377 374 if i > 0:
378 375 time.sleep(self.delay)
379 376 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
380 377 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
381 378 )
382 379
383 380 # Copy the engine args over to each engine launcher.
384 381 el.engine_cmd = copy.deepcopy(self.engine_cmd)
385 382 el.engine_args = copy.deepcopy(self.engine_args)
386 383 el.on_stop(self._notice_engine_stopped)
387 384 d = el.start()
388 385 self.launchers[i] = el
389 386 dlist.append(d)
390 387 self.notify_start(dlist)
391 388 return dlist
392 389
393 390 def find_args(self):
394 391 return ['engine set']
395 392
396 393 def signal(self, sig):
397 394 dlist = []
398 395 for el in itervalues(self.launchers):
399 396 d = el.signal(sig)
400 397 dlist.append(d)
401 398 return dlist
402 399
403 400 def interrupt_then_kill(self, delay=1.0):
404 401 dlist = []
405 402 for el in itervalues(self.launchers):
406 403 d = el.interrupt_then_kill(delay)
407 404 dlist.append(d)
408 405 return dlist
409 406
410 407 def stop(self):
411 408 return self.interrupt_then_kill()
412 409
413 410 def _notice_engine_stopped(self, data):
414 411 pid = data['pid']
415 412 for idx,el in iteritems(self.launchers):
416 413 if el.process.pid == pid:
417 414 break
418 415 self.launchers.pop(idx)
419 416 self.stop_data[idx] = data
420 417 if not self.launchers:
421 418 self.notify_stop(self.stop_data)
422 419
423 420
424 421 #-----------------------------------------------------------------------------
425 422 # MPI launchers
426 423 #-----------------------------------------------------------------------------
427 424
428 425
429 426 class MPILauncher(LocalProcessLauncher):
430 427 """Launch an external process using mpiexec."""
431 428
432 429 mpi_cmd = List(['mpiexec'], config=True,
433 430 help="The mpiexec command to use in starting the process."
434 431 )
435 432 mpi_args = List([], config=True,
436 433 help="The command line arguments to pass to mpiexec."
437 434 )
438 435 program = List(['date'],
439 436 help="The program to start via mpiexec.")
440 437 program_args = List([],
441 438 help="The command line argument to the program."
442 439 )
443 440 n = Integer(1)
444 441
445 442 def __init__(self, *args, **kwargs):
446 443 # deprecation for old MPIExec names:
447 444 config = kwargs.get('config', {})
448 445 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
449 446 deprecated = config.get(oldname)
450 447 if deprecated:
451 448 newname = oldname.replace('MPIExec', 'MPI')
452 449 config[newname].update(deprecated)
453 450 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
454 451
455 452 super(MPILauncher, self).__init__(*args, **kwargs)
456 453
457 454 def find_args(self):
458 455 """Build self.args using all the fields."""
459 456 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
460 457 self.program + self.program_args
461 458
462 459 def start(self, n):
463 460 """Start n instances of the program using mpiexec."""
464 461 self.n = n
465 462 return super(MPILauncher, self).start()
466 463
467 464
468 465 class MPIControllerLauncher(MPILauncher, ControllerMixin):
469 466 """Launch a controller using mpiexec."""
470 467
471 468 # alias back to *non-configurable* program[_args] for use in find_args()
472 469 # this way all Controller/EngineSetLaunchers have the same form, rather
473 470 # than *some* having `program_args` and others `controller_args`
474 471 @property
475 472 def program(self):
476 473 return self.controller_cmd
477 474
478 475 @property
479 476 def program_args(self):
480 477 return self.cluster_args + self.controller_args
481 478
482 479 def start(self):
483 480 """Start the controller by profile_dir."""
484 481 return super(MPIControllerLauncher, self).start(1)
485 482
486 483
487 484 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
488 485 """Launch engines using mpiexec"""
489 486
490 487 # alias back to *non-configurable* program[_args] for use in find_args()
491 488 # this way all Controller/EngineSetLaunchers have the same form, rather
492 489 # than *some* having `program_args` and others `controller_args`
493 490 @property
494 491 def program(self):
495 492 return self.engine_cmd
496 493
497 494 @property
498 495 def program_args(self):
499 496 return self.cluster_args + self.engine_args
500 497
501 498 def start(self, n):
502 499 """Start n engines by profile or profile_dir."""
503 500 self.n = n
504 501 return super(MPIEngineSetLauncher, self).start(n)
505 502
506 503 # deprecated MPIExec names
507 504 class DeprecatedMPILauncher(object):
508 505 def warn(self):
509 506 oldname = self.__class__.__name__
510 507 newname = oldname.replace('MPIExec', 'MPI')
511 508 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
512 509
513 510 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
514 511 """Deprecated, use MPILauncher"""
515 512 def __init__(self, *args, **kwargs):
516 513 super(MPIExecLauncher, self).__init__(*args, **kwargs)
517 514 self.warn()
518 515
519 516 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
520 517 """Deprecated, use MPIControllerLauncher"""
521 518 def __init__(self, *args, **kwargs):
522 519 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
523 520 self.warn()
524 521
525 522 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
526 523 """Deprecated, use MPIEngineSetLauncher"""
527 524 def __init__(self, *args, **kwargs):
528 525 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
529 526 self.warn()
530 527
531 528
532 529 #-----------------------------------------------------------------------------
533 530 # SSH launchers
534 531 #-----------------------------------------------------------------------------
535 532
536 533 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
537 534
538 535 class SSHLauncher(LocalProcessLauncher):
539 536 """A minimal launcher for ssh.
540 537
541 538 To be useful this will probably have to be extended to use the ``sshx``
542 539 idea for environment variables. There could be other things this needs
543 540 as well.
544 541 """
545 542
546 543 ssh_cmd = List(['ssh'], config=True,
547 544 help="command for starting ssh")
548 545 ssh_args = List(['-tt'], config=True,
549 546 help="args to pass to ssh")
550 547 scp_cmd = List(['scp'], config=True,
551 548 help="command for sending files")
552 549 program = List(['date'],
553 550 help="Program to launch via ssh")
554 551 program_args = List([],
555 552 help="args to pass to remote program")
556 553 hostname = Unicode('', config=True,
557 554 help="hostname on which to launch the program")
558 555 user = Unicode('', config=True,
559 556 help="username for ssh")
560 557 location = Unicode('', config=True,
561 558 help="user@hostname location for ssh in one setting")
562 559 to_fetch = List([], config=True,
563 560 help="List of (remote, local) files to fetch after starting")
564 561 to_send = List([], config=True,
565 562 help="List of (local, remote) files to send before starting")
566 563
567 564 def _hostname_changed(self, name, old, new):
568 565 if self.user:
569 566 self.location = u'%s@%s' % (self.user, new)
570 567 else:
571 568 self.location = new
572 569
573 570 def _user_changed(self, name, old, new):
574 571 self.location = u'%s@%s' % (new, self.hostname)
575 572
576 573 def find_args(self):
577 574 return self.ssh_cmd + self.ssh_args + [self.location] + \
578 575 list(map(pipes.quote, self.program + self.program_args))
579 576
580 577 def _send_file(self, local, remote):
581 578 """send a single file"""
582 579 full_remote = "%s:%s" % (self.location, remote)
583 580 for i in range(10):
584 581 if not os.path.exists(local):
585 582 self.log.debug("waiting for %s" % local)
586 583 time.sleep(1)
587 584 else:
588 585 break
589 586 remote_dir = os.path.dirname(remote)
590 587 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir)
591 588 check_output(self.ssh_cmd + self.ssh_args + \
592 589 [self.location, 'mkdir', '-p', '--', remote_dir]
593 590 )
594 591 self.log.info("sending %s to %s", local, full_remote)
595 592 check_output(self.scp_cmd + [local, full_remote])
596 593
597 594 def send_files(self):
598 595 """send our files (called before start)"""
599 596 if not self.to_send:
600 597 return
601 598 for local_file, remote_file in self.to_send:
602 599 self._send_file(local_file, remote_file)
603 600
604 601 def _fetch_file(self, remote, local):
605 602 """fetch a single file"""
606 603 full_remote = "%s:%s" % (self.location, remote)
607 604 self.log.info("fetching %s from %s", local, full_remote)
608 605 for i in range(10):
609 606 # wait up to 10s for remote file to exist
610 607 check = check_output(self.ssh_cmd + self.ssh_args + \
611 608 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
612 609 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
613 610 if check == u'no':
614 611 time.sleep(1)
615 612 elif check == u'yes':
616 613 break
617 614 local_dir = os.path.dirname(local)
618 615 ensure_dir_exists(local_dir, 775)
619 616 check_output(self.scp_cmd + [full_remote, local])
620 617
621 618 def fetch_files(self):
622 619 """fetch remote files (called after start)"""
623 620 if not self.to_fetch:
624 621 return
625 622 for remote_file, local_file in self.to_fetch:
626 623 self._fetch_file(remote_file, local_file)
627 624
628 625 def start(self, hostname=None, user=None):
629 626 if hostname is not None:
630 627 self.hostname = hostname
631 628 if user is not None:
632 629 self.user = user
633 630
634 631 self.send_files()
635 632 super(SSHLauncher, self).start()
636 633 self.fetch_files()
637 634
638 635 def signal(self, sig):
639 636 if self.state == 'running':
640 637 # send escaped ssh connection-closer
641 638 self.process.stdin.write('~.')
642 639 self.process.stdin.flush()
643 640
644 641 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
645 642
646 643 remote_profile_dir = Unicode('', config=True,
647 644 help="""The remote profile_dir to use.
648 645
649 646 If not specified, use calling profile, stripping out possible leading homedir.
650 647 """)
651 648
652 649 def _profile_dir_changed(self, name, old, new):
653 650 if not self.remote_profile_dir:
654 651 # trigger remote_profile_dir_default logic again,
655 652 # in case it was already triggered before profile_dir was set
656 653 self.remote_profile_dir = self._strip_home(new)
657 654
658 655 @staticmethod
659 656 def _strip_home(path):
660 657 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
661 658 home = get_home_dir()
662 659 if not home.endswith('/'):
663 660 home = home+'/'
664 661
665 662 if path.startswith(home):
666 663 return path[len(home):]
667 664 else:
668 665 return path
669 666
670 667 def _remote_profile_dir_default(self):
671 668 return self._strip_home(self.profile_dir)
672 669
673 670 def _cluster_id_changed(self, name, old, new):
674 671 if new:
675 672 raise ValueError("cluster id not supported by SSH launchers")
676 673
677 674 @property
678 675 def cluster_args(self):
679 676 return ['--profile-dir', self.remote_profile_dir]
680 677
681 678 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
682 679
683 680 # alias back to *non-configurable* program[_args] for use in find_args()
684 681 # this way all Controller/EngineSetLaunchers have the same form, rather
685 682 # than *some* having `program_args` and others `controller_args`
686 683
687 684 def _controller_cmd_default(self):
688 685 return ['ipcontroller']
689 686
690 687 @property
691 688 def program(self):
692 689 return self.controller_cmd
693 690
694 691 @property
695 692 def program_args(self):
696 693 return self.cluster_args + self.controller_args
697 694
698 695 def _to_fetch_default(self):
699 696 return [
700 697 (os.path.join(self.remote_profile_dir, 'security', cf),
701 698 os.path.join(self.profile_dir, 'security', cf),)
702 699 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
703 700 ]
704 701
705 702 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
706 703
707 704 # alias back to *non-configurable* program[_args] for use in find_args()
708 705 # this way all Controller/EngineSetLaunchers have the same form, rather
709 706 # than *some* having `program_args` and others `controller_args`
710 707
711 708 def _engine_cmd_default(self):
712 709 return ['ipengine']
713 710
714 711 @property
715 712 def program(self):
716 713 return self.engine_cmd
717 714
718 715 @property
719 716 def program_args(self):
720 717 return self.cluster_args + self.engine_args
721 718
722 719 def _to_send_default(self):
723 720 return [
724 721 (os.path.join(self.profile_dir, 'security', cf),
725 722 os.path.join(self.remote_profile_dir, 'security', cf))
726 723 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
727 724 ]
728 725
729 726
730 727 class SSHEngineSetLauncher(LocalEngineSetLauncher):
731 728 launcher_class = SSHEngineLauncher
732 729 engines = Dict(config=True,
733 730 help="""dict of engines to launch. This is a dict by hostname of ints,
734 731 corresponding to the number of engines to start on that host.""")
735 732
736 733 def _engine_cmd_default(self):
737 734 return ['ipengine']
738 735
739 736 @property
740 737 def engine_count(self):
741 738 """determine engine count from `engines` dict"""
742 739 count = 0
743 740 for n in itervalues(self.engines):
744 741 if isinstance(n, (tuple,list)):
745 742 n,args = n
746 743 count += n
747 744 return count
748 745
749 746 def start(self, n):
750 747 """Start engines by profile or profile_dir.
751 748 `n` is ignored, and the `engines` config property is used instead.
752 749 """
753 750
754 751 dlist = []
755 752 for host, n in iteritems(self.engines):
756 753 if isinstance(n, (tuple, list)):
757 754 n, args = n
758 755 else:
759 756 args = copy.deepcopy(self.engine_args)
760 757
761 758 if '@' in host:
762 759 user,host = host.split('@',1)
763 760 else:
764 761 user=None
765 762 for i in range(n):
766 763 if i > 0:
767 764 time.sleep(self.delay)
768 765 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
769 766 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
770 767 )
771 768 if i > 0:
772 769 # only send files for the first engine on each host
773 770 el.to_send = []
774 771
775 772 # Copy the engine args over to each engine launcher.
776 773 el.engine_cmd = self.engine_cmd
777 774 el.engine_args = args
778 775 el.on_stop(self._notice_engine_stopped)
779 776 d = el.start(user=user, hostname=host)
780 777 self.launchers[ "%s/%i" % (host,i) ] = el
781 778 dlist.append(d)
782 779 self.notify_start(dlist)
783 780 return dlist
784 781
785 782
786 783 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
787 784 """Launcher for calling
788 785 `ipcluster engines` on a remote machine.
789 786
790 787 Requires that remote profile is already configured.
791 788 """
792 789
793 790 n = Integer()
794 791 ipcluster_cmd = List(['ipcluster'], config=True)
795 792
796 793 @property
797 794 def program(self):
798 795 return self.ipcluster_cmd + ['engines']
799 796
800 797 @property
801 798 def program_args(self):
802 799 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
803 800
804 801 def _to_send_default(self):
805 802 return [
806 803 (os.path.join(self.profile_dir, 'security', cf),
807 804 os.path.join(self.remote_profile_dir, 'security', cf))
808 805 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
809 806 ]
810 807
811 808 def start(self, n):
812 809 self.n = n
813 810 super(SSHProxyEngineSetLauncher, self).start()
814 811
815 812
816 813 #-----------------------------------------------------------------------------
817 814 # Windows HPC Server 2008 scheduler launchers
818 815 #-----------------------------------------------------------------------------
819 816
820 817
821 818 # This is only used on Windows.
822 819 def find_job_cmd():
823 820 if WINDOWS:
824 821 try:
825 822 return find_cmd('job')
826 823 except (FindCmdError, ImportError):
827 824 # ImportError will be raised if win32api is not installed
828 825 return 'job'
829 826 else:
830 827 return 'job'
831 828
832 829
833 830 class WindowsHPCLauncher(BaseLauncher):
834 831
835 832 job_id_regexp = CRegExp(r'\d+', config=True,
836 833 help="""A regular expression used to get the job id from the output of the
837 834 submit_command. """
838 835 )
839 836 job_file_name = Unicode(u'ipython_job.xml', config=True,
840 837 help="The filename of the instantiated job script.")
841 838 # The full path to the instantiated job script. This gets made dynamically
842 839 # by combining the work_dir with the job_file_name.
843 840 job_file = Unicode(u'')
844 841 scheduler = Unicode('', config=True,
845 842 help="The hostname of the scheduler to submit the job to.")
846 843 job_cmd = Unicode(find_job_cmd(), config=True,
847 844 help="The command for submitting jobs.")
848 845
849 846 def __init__(self, work_dir=u'.', config=None, **kwargs):
850 847 super(WindowsHPCLauncher, self).__init__(
851 848 work_dir=work_dir, config=config, **kwargs
852 849 )
853 850
854 851 @property
855 852 def job_file(self):
856 853 return os.path.join(self.work_dir, self.job_file_name)
857 854
858 855 def write_job_file(self, n):
859 856 raise NotImplementedError("Implement write_job_file in a subclass.")
860 857
861 858 def find_args(self):
862 859 return [u'job.exe']
863 860
864 861 def parse_job_id(self, output):
865 862 """Take the output of the submit command and return the job id."""
866 863 m = self.job_id_regexp.search(output)
867 864 if m is not None:
868 865 job_id = m.group()
869 866 else:
870 867 raise LauncherError("Job id couldn't be determined: %s" % output)
871 868 self.job_id = job_id
872 869 self.log.info('Job started with id: %r', job_id)
873 870 return job_id
874 871
875 872 def start(self, n):
876 873 """Start n copies of the process using the Win HPC job scheduler."""
877 874 self.write_job_file(n)
878 875 args = [
879 876 'submit',
880 877 '/jobfile:%s' % self.job_file,
881 878 '/scheduler:%s' % self.scheduler
882 879 ]
883 880 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
884 881
885 882 output = check_output([self.job_cmd]+args,
886 883 env=os.environ,
887 884 cwd=self.work_dir,
888 885 stderr=STDOUT
889 886 )
890 887 output = output.decode(DEFAULT_ENCODING, 'replace')
891 888 job_id = self.parse_job_id(output)
892 889 self.notify_start(job_id)
893 890 return job_id
894 891
895 892 def stop(self):
896 893 args = [
897 894 'cancel',
898 895 self.job_id,
899 896 '/scheduler:%s' % self.scheduler
900 897 ]
901 898 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
902 899 try:
903 900 output = check_output([self.job_cmd]+args,
904 901 env=os.environ,
905 902 cwd=self.work_dir,
906 903 stderr=STDOUT
907 904 )
908 905 output = output.decode(DEFAULT_ENCODING, 'replace')
909 906 except:
910 907 output = u'The job already appears to be stopped: %r' % self.job_id
911 908 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
912 909 return output
913 910
914 911
915 912 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
916 913
917 914 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
918 915 help="WinHPC xml job file.")
919 916 controller_args = List([], config=False,
920 917 help="extra args to pass to ipcontroller")
921 918
922 919 def write_job_file(self, n):
923 920 job = IPControllerJob(parent=self)
924 921
925 922 t = IPControllerTask(parent=self)
926 923 # The tasks work directory is *not* the actual work directory of
927 924 # the controller. It is used as the base path for the stdout/stderr
928 925 # files that the scheduler redirects to.
929 926 t.work_directory = self.profile_dir
930 927 # Add the profile_dir and from self.start().
931 928 t.controller_args.extend(self.cluster_args)
932 929 t.controller_args.extend(self.controller_args)
933 930 job.add_task(t)
934 931
935 932 self.log.debug("Writing job description file: %s", self.job_file)
936 933 job.write(self.job_file)
937 934
938 935 @property
939 936 def job_file(self):
940 937 return os.path.join(self.profile_dir, self.job_file_name)
941 938
942 939 def start(self):
943 940 """Start the controller by profile_dir."""
944 941 return super(WindowsHPCControllerLauncher, self).start(1)
945 942
946 943
947 944 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
948 945
949 946 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
950 947 help="jobfile for ipengines job")
951 948 engine_args = List([], config=False,
952 949 help="extra args to pas to ipengine")
953 950
954 951 def write_job_file(self, n):
955 952 job = IPEngineSetJob(parent=self)
956 953
957 954 for i in range(n):
958 955 t = IPEngineTask(parent=self)
959 956 # The tasks work directory is *not* the actual work directory of
960 957 # the engine. It is used as the base path for the stdout/stderr
961 958 # files that the scheduler redirects to.
962 959 t.work_directory = self.profile_dir
963 960 # Add the profile_dir and from self.start().
964 961 t.engine_args.extend(self.cluster_args)
965 962 t.engine_args.extend(self.engine_args)
966 963 job.add_task(t)
967 964
968 965 self.log.debug("Writing job description file: %s", self.job_file)
969 966 job.write(self.job_file)
970 967
971 968 @property
972 969 def job_file(self):
973 970 return os.path.join(self.profile_dir, self.job_file_name)
974 971
975 972 def start(self, n):
976 973 """Start the controller by profile_dir."""
977 974 return super(WindowsHPCEngineSetLauncher, self).start(n)
978 975
979 976
980 977 #-----------------------------------------------------------------------------
981 978 # Batch (PBS) system launchers
982 979 #-----------------------------------------------------------------------------
983 980
984 981 class BatchClusterAppMixin(ClusterAppMixin):
985 982 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
986 983 def _profile_dir_changed(self, name, old, new):
987 984 self.context[name] = new
988 985 _cluster_id_changed = _profile_dir_changed
989 986
990 987 def _profile_dir_default(self):
991 988 self.context['profile_dir'] = ''
992 989 return ''
993 990 def _cluster_id_default(self):
994 991 self.context['cluster_id'] = ''
995 992 return ''
996 993
997 994
998 995 class BatchSystemLauncher(BaseLauncher):
999 996 """Launch an external process using a batch system.
1000 997
1001 998 This class is designed to work with UNIX batch systems like PBS, LSF,
1002 999 GridEngine, etc. The overall model is that there are different commands
1003 1000 like qsub, qdel, etc. that handle the starting and stopping of the process.
1004 1001
1005 1002 This class also has the notion of a batch script. The ``batch_template``
1006 1003 attribute can be set to a string that is a template for the batch script.
1007 1004 This template is instantiated using string formatting. Thus the template can
1008 1005 use {n} fot the number of instances. Subclasses can add additional variables
1009 1006 to the template dict.
1010 1007 """
1011 1008
1012 1009 # Subclasses must fill these in. See PBSEngineSet
1013 1010 submit_command = List([''], config=True,
1014 1011 help="The name of the command line program used to submit jobs.")
1015 1012 delete_command = List([''], config=True,
1016 1013 help="The name of the command line program used to delete jobs.")
1017 1014 job_id_regexp = CRegExp('', config=True,
1018 1015 help="""A regular expression used to get the job id from the output of the
1019 1016 submit_command.""")
1020 1017 job_id_regexp_group = Integer(0, config=True,
1021 1018 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1022 1019 batch_template = Unicode('', config=True,
1023 1020 help="The string that is the batch script template itself.")
1024 1021 batch_template_file = Unicode(u'', config=True,
1025 1022 help="The file that contains the batch template.")
1026 1023 batch_file_name = Unicode(u'batch_script', config=True,
1027 1024 help="The filename of the instantiated batch script.")
1028 1025 queue = Unicode(u'', config=True,
1029 1026 help="The PBS Queue.")
1030 1027
1031 1028 def _queue_changed(self, name, old, new):
1032 1029 self.context[name] = new
1033 1030
1034 1031 n = Integer(1)
1035 1032 _n_changed = _queue_changed
1036 1033
1037 1034 # not configurable, override in subclasses
1038 1035 # PBS Job Array regex
1039 1036 job_array_regexp = CRegExp('')
1040 1037 job_array_template = Unicode('')
1041 1038 # PBS Queue regex
1042 1039 queue_regexp = CRegExp('')
1043 1040 queue_template = Unicode('')
1044 1041 # The default batch template, override in subclasses
1045 1042 default_template = Unicode('')
1046 1043 # The full path to the instantiated batch script.
1047 1044 batch_file = Unicode(u'')
1048 1045 # the format dict used with batch_template:
1049 1046 context = Dict()
1050 1047
1051 1048 def _context_default(self):
1052 1049 """load the default context with the default values for the basic keys
1053 1050
1054 1051 because the _trait_changed methods only load the context if they
1055 1052 are set to something other than the default value.
1056 1053 """
1057 1054 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1058 1055
1059 1056 # the Formatter instance for rendering the templates:
1060 1057 formatter = Instance(EvalFormatter, (), {})
1061 1058
1062 1059 def find_args(self):
1063 1060 return self.submit_command + [self.batch_file]
1064 1061
1065 1062 def __init__(self, work_dir=u'.', config=None, **kwargs):
1066 1063 super(BatchSystemLauncher, self).__init__(
1067 1064 work_dir=work_dir, config=config, **kwargs
1068 1065 )
1069 1066 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1070 1067
1071 1068 def parse_job_id(self, output):
1072 1069 """Take the output of the submit command and return the job id."""
1073 1070 m = self.job_id_regexp.search(output)
1074 1071 if m is not None:
1075 1072 job_id = m.group(self.job_id_regexp_group)
1076 1073 else:
1077 1074 raise LauncherError("Job id couldn't be determined: %s" % output)
1078 1075 self.job_id = job_id
1079 1076 self.log.info('Job submitted with job id: %r', job_id)
1080 1077 return job_id
1081 1078
1082 1079 def write_batch_script(self, n):
1083 1080 """Instantiate and write the batch script to the work_dir."""
1084 1081 self.n = n
1085 1082 # first priority is batch_template if set
1086 1083 if self.batch_template_file and not self.batch_template:
1087 1084 # second priority is batch_template_file
1088 1085 with open(self.batch_template_file) as f:
1089 1086 self.batch_template = f.read()
1090 1087 if not self.batch_template:
1091 1088 # third (last) priority is default_template
1092 1089 self.batch_template = self.default_template
1093 1090 # add jobarray or queue lines to user-specified template
1094 1091 # note that this is *only* when user did not specify a template.
1095 1092 self._insert_queue_in_script()
1096 1093 self._insert_job_array_in_script()
1097 1094 script_as_string = self.formatter.format(self.batch_template, **self.context)
1098 1095 self.log.debug('Writing batch script: %s', self.batch_file)
1099 1096 with open(self.batch_file, 'w') as f:
1100 1097 f.write(script_as_string)
1101 1098 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1102 1099
1103 1100 def _insert_queue_in_script(self):
1104 1101 """Inserts a queue if required into the batch script.
1105 1102 """
1106 1103 if self.queue and not self.queue_regexp.search(self.batch_template):
1107 1104 self.log.debug("adding PBS queue settings to batch script")
1108 1105 firstline, rest = self.batch_template.split('\n',1)
1109 1106 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1110 1107
1111 1108 def _insert_job_array_in_script(self):
1112 1109 """Inserts a job array if required into the batch script.
1113 1110 """
1114 1111 if not self.job_array_regexp.search(self.batch_template):
1115 1112 self.log.debug("adding job array settings to batch script")
1116 1113 firstline, rest = self.batch_template.split('\n',1)
1117 1114 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1118 1115
1119 1116 def start(self, n):
1120 1117 """Start n copies of the process using a batch system."""
1121 1118 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1122 1119 # Here we save profile_dir in the context so they
1123 1120 # can be used in the batch script template as {profile_dir}
1124 1121 self.write_batch_script(n)
1125 1122 output = check_output(self.args, env=os.environ)
1126 1123 output = output.decode(DEFAULT_ENCODING, 'replace')
1127 1124
1128 1125 job_id = self.parse_job_id(output)
1129 1126 self.notify_start(job_id)
1130 1127 return job_id
1131 1128
1132 1129 def stop(self):
1133 1130 try:
1134 1131 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1135 1132 stdout=PIPE, stderr=PIPE)
1136 1133 out, err = p.communicate()
1137 1134 output = out + err
1138 1135 except:
1139 1136 self.log.exception("Problem stopping cluster with command: %s" %
1140 1137 (self.delete_command + [self.job_id]))
1141 1138 output = ""
1142 1139 output = output.decode(DEFAULT_ENCODING, 'replace')
1143 1140 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1144 1141 return output
1145 1142
1146 1143
1147 1144 class PBSLauncher(BatchSystemLauncher):
1148 1145 """A BatchSystemLauncher subclass for PBS."""
1149 1146
1150 1147 submit_command = List(['qsub'], config=True,
1151 1148 help="The PBS submit command ['qsub']")
1152 1149 delete_command = List(['qdel'], config=True,
1153 1150 help="The PBS delete command ['qsub']")
1154 1151 job_id_regexp = CRegExp(r'\d+', config=True,
1155 1152 help="Regular expresion for identifying the job ID [r'\d+']")
1156 1153
1157 1154 batch_file = Unicode(u'')
1158 1155 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1159 1156 job_array_template = Unicode('#PBS -t 1-{n}')
1160 1157 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1161 1158 queue_template = Unicode('#PBS -q {queue}')
1162 1159
1163 1160
1164 1161 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1165 1162 """Launch a controller using PBS."""
1166 1163
1167 1164 batch_file_name = Unicode(u'pbs_controller', config=True,
1168 1165 help="batch file name for the controller job.")
1169 1166 default_template= Unicode("""#!/bin/sh
1170 1167 #PBS -V
1171 1168 #PBS -N ipcontroller
1172 1169 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1173 1170 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1174 1171
1175 1172 def start(self):
1176 1173 """Start the controller by profile or profile_dir."""
1177 1174 return super(PBSControllerLauncher, self).start(1)
1178 1175
1179 1176
1180 1177 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1181 1178 """Launch Engines using PBS"""
1182 1179 batch_file_name = Unicode(u'pbs_engines', config=True,
1183 1180 help="batch file name for the engine(s) job.")
1184 1181 default_template= Unicode(u"""#!/bin/sh
1185 1182 #PBS -V
1186 1183 #PBS -N ipengine
1187 1184 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1188 1185 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1189 1186
1190 1187
1191 1188 #SGE is very similar to PBS
1192 1189
1193 1190 class SGELauncher(PBSLauncher):
1194 1191 """Sun GridEngine is a PBS clone with slightly different syntax"""
1195 1192 job_array_regexp = CRegExp('#\$\W+\-t')
1196 1193 job_array_template = Unicode('#$ -t 1-{n}')
1197 1194 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1198 1195 queue_template = Unicode('#$ -q {queue}')
1199 1196
1200 1197
1201 1198 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1202 1199 """Launch a controller using SGE."""
1203 1200
1204 1201 batch_file_name = Unicode(u'sge_controller', config=True,
1205 1202 help="batch file name for the ipontroller job.")
1206 1203 default_template= Unicode(u"""#$ -V
1207 1204 #$ -S /bin/sh
1208 1205 #$ -N ipcontroller
1209 1206 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1210 1207 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1211 1208
1212 1209 def start(self):
1213 1210 """Start the controller by profile or profile_dir."""
1214 1211 return super(SGEControllerLauncher, self).start(1)
1215 1212
1216 1213
1217 1214 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1218 1215 """Launch Engines with SGE"""
1219 1216 batch_file_name = Unicode(u'sge_engines', config=True,
1220 1217 help="batch file name for the engine(s) job.")
1221 1218 default_template = Unicode("""#$ -V
1222 1219 #$ -S /bin/sh
1223 1220 #$ -N ipengine
1224 1221 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1225 1222 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1226 1223
1227 1224
1228 1225 # LSF launchers
1229 1226
1230 1227 class LSFLauncher(BatchSystemLauncher):
1231 1228 """A BatchSystemLauncher subclass for LSF."""
1232 1229
1233 1230 submit_command = List(['bsub'], config=True,
1234 1231 help="The PBS submit command ['bsub']")
1235 1232 delete_command = List(['bkill'], config=True,
1236 1233 help="The PBS delete command ['bkill']")
1237 1234 job_id_regexp = CRegExp(r'\d+', config=True,
1238 1235 help="Regular expresion for identifying the job ID [r'\d+']")
1239 1236
1240 1237 batch_file = Unicode(u'')
1241 1238 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1242 1239 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1243 1240 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1244 1241 queue_template = Unicode('#BSUB -q {queue}')
1245 1242
1246 1243 def start(self, n):
1247 1244 """Start n copies of the process using LSF batch system.
1248 1245 This cant inherit from the base class because bsub expects
1249 1246 to be piped a shell script in order to honor the #BSUB directives :
1250 1247 bsub < script
1251 1248 """
1252 1249 # Here we save profile_dir in the context so they
1253 1250 # can be used in the batch script template as {profile_dir}
1254 1251 self.write_batch_script(n)
1255 1252 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1256 1253 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1257 1254 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1258 1255 output,err = p.communicate()
1259 1256 output = output.decode(DEFAULT_ENCODING, 'replace')
1260 1257 job_id = self.parse_job_id(output)
1261 1258 self.notify_start(job_id)
1262 1259 return job_id
1263 1260
1264 1261
1265 1262 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1266 1263 """Launch a controller using LSF."""
1267 1264
1268 1265 batch_file_name = Unicode(u'lsf_controller', config=True,
1269 1266 help="batch file name for the controller job.")
1270 1267 default_template= Unicode("""#!/bin/sh
1271 1268 #BSUB -J ipcontroller
1272 1269 #BSUB -oo ipcontroller.o.%%J
1273 1270 #BSUB -eo ipcontroller.e.%%J
1274 1271 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1275 1272 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1276 1273
1277 1274 def start(self):
1278 1275 """Start the controller by profile or profile_dir."""
1279 1276 return super(LSFControllerLauncher, self).start(1)
1280 1277
1281 1278
1282 1279 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1283 1280 """Launch Engines using LSF"""
1284 1281 batch_file_name = Unicode(u'lsf_engines', config=True,
1285 1282 help="batch file name for the engine(s) job.")
1286 1283 default_template= Unicode(u"""#!/bin/sh
1287 1284 #BSUB -oo ipengine.o.%%J
1288 1285 #BSUB -eo ipengine.e.%%J
1289 1286 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1290 1287 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1291 1288
1292 1289
1293 1290
1294 1291 class HTCondorLauncher(BatchSystemLauncher):
1295 1292 """A BatchSystemLauncher subclass for HTCondor.
1296 1293
1297 1294 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1298 1295 that the python instance but otherwise is very similar to PBS. This is because
1299 1296 HTCondor destroys sys.executable when launching remote processes - a launched
1300 1297 python process depends on sys.executable to effectively evaluate its
1301 1298 module search paths. Without it, regardless of which python interpreter you launch
1302 1299 you will get the to built in module search paths.
1303 1300
1304 1301 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1305 1302 this - the mechanism of shebanged scripts means that the python binary will be
1306 1303 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1307 1304 scripts on the remote node*. This means you need to take care that:
1308 1305
1309 1306 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1310 1307 of the python environment you wish to execute code in having top precedence.
1311 1308 b. This functionality is untested on Windows.
1312 1309
1313 1310 If you need different behavior, consider making you own template.
1314 1311 """
1315 1312
1316 1313 submit_command = List(['condor_submit'], config=True,
1317 1314 help="The HTCondor submit command ['condor_submit']")
1318 1315 delete_command = List(['condor_rm'], config=True,
1319 1316 help="The HTCondor delete command ['condor_rm']")
1320 1317 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1321 1318 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1322 1319 job_id_regexp_group = Integer(1, config=True,
1323 1320 help="""The group we wish to match in job_id_regexp [1]""")
1324 1321
1325 1322 job_array_regexp = CRegExp('queue\W+\$')
1326 1323 job_array_template = Unicode('queue {n}')
1327 1324
1328 1325
1329 1326 def _insert_job_array_in_script(self):
1330 1327 """Inserts a job array if required into the batch script.
1331 1328 """
1332 1329 if not self.job_array_regexp.search(self.batch_template):
1333 1330 self.log.debug("adding job array settings to batch script")
1334 1331 #HTCondor requires that the job array goes at the bottom of the script
1335 1332 self.batch_template = '\n'.join([self.batch_template,
1336 1333 self.job_array_template])
1337 1334
1338 1335 def _insert_queue_in_script(self):
1339 1336 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1340 1337 specified in the script.
1341 1338 """
1342 1339 pass
1343 1340
1344 1341
1345 1342 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1346 1343 """Launch a controller using HTCondor."""
1347 1344
1348 1345 batch_file_name = Unicode(u'htcondor_controller', config=True,
1349 1346 help="batch file name for the controller job.")
1350 1347 default_template = Unicode(r"""
1351 1348 universe = vanilla
1352 1349 executable = ipcontroller
1353 1350 # by default we expect a shared file system
1354 1351 transfer_executable = False
1355 1352 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1356 1353 """)
1357 1354
1358 1355 def start(self):
1359 1356 """Start the controller by profile or profile_dir."""
1360 1357 return super(HTCondorControllerLauncher, self).start(1)
1361 1358
1362 1359
1363 1360 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1364 1361 """Launch Engines using HTCondor"""
1365 1362 batch_file_name = Unicode(u'htcondor_engines', config=True,
1366 1363 help="batch file name for the engine(s) job.")
1367 1364 default_template = Unicode("""
1368 1365 universe = vanilla
1369 1366 executable = ipengine
1370 1367 # by default we expect a shared file system
1371 1368 transfer_executable = False
1372 1369 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1373 1370 """)
1374 1371
1375 1372
1376 1373 #-----------------------------------------------------------------------------
1377 1374 # A launcher for ipcluster itself!
1378 1375 #-----------------------------------------------------------------------------
1379 1376
1380 1377
1381 1378 class IPClusterLauncher(LocalProcessLauncher):
1382 1379 """Launch the ipcluster program in an external process."""
1383 1380
1384 1381 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1385 1382 help="Popen command for ipcluster")
1386 1383 ipcluster_args = List(
1387 1384 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1388 1385 help="Command line arguments to pass to ipcluster.")
1389 1386 ipcluster_subcommand = Unicode('start')
1390 1387 profile = Unicode('default')
1391 1388 n = Integer(2)
1392 1389
1393 1390 def find_args(self):
1394 1391 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1395 1392 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1396 1393 self.ipcluster_args
1397 1394
1398 1395 def start(self):
1399 1396 return super(IPClusterLauncher, self).start()
1400 1397
1401 1398 #-----------------------------------------------------------------------------
1402 1399 # Collections of launchers
1403 1400 #-----------------------------------------------------------------------------
1404 1401
1405 1402 local_launchers = [
1406 1403 LocalControllerLauncher,
1407 1404 LocalEngineLauncher,
1408 1405 LocalEngineSetLauncher,
1409 1406 ]
1410 1407 mpi_launchers = [
1411 1408 MPILauncher,
1412 1409 MPIControllerLauncher,
1413 1410 MPIEngineSetLauncher,
1414 1411 ]
1415 1412 ssh_launchers = [
1416 1413 SSHLauncher,
1417 1414 SSHControllerLauncher,
1418 1415 SSHEngineLauncher,
1419 1416 SSHEngineSetLauncher,
1420 1417 SSHProxyEngineSetLauncher,
1421 1418 ]
1422 1419 winhpc_launchers = [
1423 1420 WindowsHPCLauncher,
1424 1421 WindowsHPCControllerLauncher,
1425 1422 WindowsHPCEngineSetLauncher,
1426 1423 ]
1427 1424 pbs_launchers = [
1428 1425 PBSLauncher,
1429 1426 PBSControllerLauncher,
1430 1427 PBSEngineSetLauncher,
1431 1428 ]
1432 1429 sge_launchers = [
1433 1430 SGELauncher,
1434 1431 SGEControllerLauncher,
1435 1432 SGEEngineSetLauncher,
1436 1433 ]
1437 1434 lsf_launchers = [
1438 1435 LSFLauncher,
1439 1436 LSFControllerLauncher,
1440 1437 LSFEngineSetLauncher,
1441 1438 ]
1442 1439 htcondor_launchers = [
1443 1440 HTCondorLauncher,
1444 1441 HTCondorControllerLauncher,
1445 1442 HTCondorEngineSetLauncher,
1446 1443 ]
1447 1444 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1448 1445 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
General Comments 0
You need to be logged in to leave comments. Login now