##// END OF EJS Templates
Merge pull request #4977 from minrk/scp-mkdir-p...
Brian E. Granger -
r15230:d522a1be merge
parent child Browse files
Show More
@@ -1,1455 +1,1463 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import pipes
26 26 import stat
27 27 import sys
28 28 import time
29 29
30 30 # signal imports, handling various platforms, versions
31 31
32 32 from signal import SIGINT, SIGTERM
33 33 try:
34 34 from signal import SIGKILL
35 35 except ImportError:
36 36 # Windows
37 37 SIGKILL=SIGTERM
38 38
39 39 try:
40 40 # Windows >= 2.7, 3.2
41 41 from signal import CTRL_C_EVENT as SIGINT
42 42 except ImportError:
43 43 pass
44 44
45 45 from subprocess import Popen, PIPE, STDOUT
46 46 try:
47 47 from subprocess import check_output
48 48 except ImportError:
49 49 # pre-2.7, define check_output with Popen
50 50 def check_output(*args, **kwargs):
51 51 kwargs.update(dict(stdout=PIPE))
52 52 p = Popen(*args, **kwargs)
53 53 out,err = p.communicate()
54 54 return out
55 55
56 56 from zmq.eventloop import ioloop
57 57
58 58 from IPython.config.application import Application
59 59 from IPython.config.configurable import LoggingConfigurable
60 60 from IPython.utils.text import EvalFormatter
61 61 from IPython.utils.traitlets import (
62 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 63 )
64 64 from IPython.utils.encoding import DEFAULT_ENCODING
65 65 from IPython.utils.path import get_home_dir
66 66 from IPython.utils.process import find_cmd, FindCmdError
67 67 from IPython.utils.py3compat import iteritems, itervalues
68 68
69 69 from .win32support import forward_read_events
70 70
71 71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
72 72
73 73 WINDOWS = os.name == 'nt'
74 74
75 75 #-----------------------------------------------------------------------------
76 76 # Paths to the kernel apps
77 77 #-----------------------------------------------------------------------------
78 78
79 79 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
80 80
81 81 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
82 82
83 83 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
84 84
85 85 if WINDOWS and sys.version_info < (3,):
86 86 # `python -m package` doesn't work on Windows Python 2,
87 87 # but `python -m module` does.
88 88 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipengineapp"]
89 89 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipcontrollerapp"]
90 90
91 91 #-----------------------------------------------------------------------------
92 92 # Base launchers and errors
93 93 #-----------------------------------------------------------------------------
94 94
95 95 class LauncherError(Exception):
96 96 pass
97 97
98 98
99 99 class ProcessStateError(LauncherError):
100 100 pass
101 101
102 102
103 103 class UnknownStatus(LauncherError):
104 104 pass
105 105
106 106
107 107 class BaseLauncher(LoggingConfigurable):
108 108 """An asbtraction for starting, stopping and signaling a process."""
109 109
110 110 # In all of the launchers, the work_dir is where child processes will be
111 111 # run. This will usually be the profile_dir, but may not be. any work_dir
112 112 # passed into the __init__ method will override the config value.
113 113 # This should not be used to set the work_dir for the actual engine
114 114 # and controller. Instead, use their own config files or the
115 115 # controller_args, engine_args attributes of the launchers to add
116 116 # the work_dir option.
117 117 work_dir = Unicode(u'.')
118 118 loop = Instance('zmq.eventloop.ioloop.IOLoop')
119 119
120 120 start_data = Any()
121 121 stop_data = Any()
122 122
123 123 def _loop_default(self):
124 124 return ioloop.IOLoop.instance()
125 125
126 126 def __init__(self, work_dir=u'.', config=None, **kwargs):
127 127 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
128 128 self.state = 'before' # can be before, running, after
129 129 self.stop_callbacks = []
130 130 self.start_data = None
131 131 self.stop_data = None
132 132
133 133 @property
134 134 def args(self):
135 135 """A list of cmd and args that will be used to start the process.
136 136
137 137 This is what is passed to :func:`spawnProcess` and the first element
138 138 will be the process name.
139 139 """
140 140 return self.find_args()
141 141
142 142 def find_args(self):
143 143 """The ``.args`` property calls this to find the args list.
144 144
145 145 Subcommand should implement this to construct the cmd and args.
146 146 """
147 147 raise NotImplementedError('find_args must be implemented in a subclass')
148 148
149 149 @property
150 150 def arg_str(self):
151 151 """The string form of the program arguments."""
152 152 return ' '.join(self.args)
153 153
154 154 @property
155 155 def running(self):
156 156 """Am I running."""
157 157 if self.state == 'running':
158 158 return True
159 159 else:
160 160 return False
161 161
162 162 def start(self):
163 163 """Start the process."""
164 164 raise NotImplementedError('start must be implemented in a subclass')
165 165
166 166 def stop(self):
167 167 """Stop the process and notify observers of stopping.
168 168
169 169 This method will return None immediately.
170 170 To observe the actual process stopping, see :meth:`on_stop`.
171 171 """
172 172 raise NotImplementedError('stop must be implemented in a subclass')
173 173
174 174 def on_stop(self, f):
175 175 """Register a callback to be called with this Launcher's stop_data
176 176 when the process actually finishes.
177 177 """
178 178 if self.state=='after':
179 179 return f(self.stop_data)
180 180 else:
181 181 self.stop_callbacks.append(f)
182 182
183 183 def notify_start(self, data):
184 184 """Call this to trigger startup actions.
185 185
186 186 This logs the process startup and sets the state to 'running'. It is
187 187 a pass-through so it can be used as a callback.
188 188 """
189 189
190 190 self.log.debug('Process %r started: %r', self.args[0], data)
191 191 self.start_data = data
192 192 self.state = 'running'
193 193 return data
194 194
195 195 def notify_stop(self, data):
196 196 """Call this to trigger process stop actions.
197 197
198 198 This logs the process stopping and sets the state to 'after'. Call
199 199 this to trigger callbacks registered via :meth:`on_stop`."""
200 200
201 201 self.log.debug('Process %r stopped: %r', self.args[0], data)
202 202 self.stop_data = data
203 203 self.state = 'after'
204 204 for i in range(len(self.stop_callbacks)):
205 205 d = self.stop_callbacks.pop()
206 206 d(data)
207 207 return data
208 208
209 209 def signal(self, sig):
210 210 """Signal the process.
211 211
212 212 Parameters
213 213 ----------
214 214 sig : str or int
215 215 'KILL', 'INT', etc., or any signal number
216 216 """
217 217 raise NotImplementedError('signal must be implemented in a subclass')
218 218
219 219 class ClusterAppMixin(HasTraits):
220 220 """MixIn for cluster args as traits"""
221 221 profile_dir=Unicode('')
222 222 cluster_id=Unicode('')
223 223
224 224 @property
225 225 def cluster_args(self):
226 226 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
227 227
228 228 class ControllerMixin(ClusterAppMixin):
229 229 controller_cmd = List(ipcontroller_cmd_argv, config=True,
230 230 help="""Popen command to launch ipcontroller.""")
231 231 # Command line arguments to ipcontroller.
232 232 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
233 233 help="""command-line args to pass to ipcontroller""")
234 234
235 235 class EngineMixin(ClusterAppMixin):
236 236 engine_cmd = List(ipengine_cmd_argv, config=True,
237 237 help="""command to launch the Engine.""")
238 238 # Command line arguments for ipengine.
239 239 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
240 240 help="command-line arguments to pass to ipengine"
241 241 )
242 242
243 243
244 244 #-----------------------------------------------------------------------------
245 245 # Local process launchers
246 246 #-----------------------------------------------------------------------------
247 247
248 248
249 249 class LocalProcessLauncher(BaseLauncher):
250 250 """Start and stop an external process in an asynchronous manner.
251 251
252 252 This will launch the external process with a working directory of
253 253 ``self.work_dir``.
254 254 """
255 255
256 256 # This is used to to construct self.args, which is passed to
257 257 # spawnProcess.
258 258 cmd_and_args = List([])
259 259 poll_frequency = Integer(100) # in ms
260 260
261 261 def __init__(self, work_dir=u'.', config=None, **kwargs):
262 262 super(LocalProcessLauncher, self).__init__(
263 263 work_dir=work_dir, config=config, **kwargs
264 264 )
265 265 self.process = None
266 266 self.poller = None
267 267
268 268 def find_args(self):
269 269 return self.cmd_and_args
270 270
271 271 def start(self):
272 272 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
273 273 if self.state == 'before':
274 274 self.process = Popen(self.args,
275 275 stdout=PIPE,stderr=PIPE,stdin=PIPE,
276 276 env=os.environ,
277 277 cwd=self.work_dir
278 278 )
279 279 if WINDOWS:
280 280 self.stdout = forward_read_events(self.process.stdout)
281 281 self.stderr = forward_read_events(self.process.stderr)
282 282 else:
283 283 self.stdout = self.process.stdout.fileno()
284 284 self.stderr = self.process.stderr.fileno()
285 285 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
286 286 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
287 287 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
288 288 self.poller.start()
289 289 self.notify_start(self.process.pid)
290 290 else:
291 291 s = 'The process was already started and has state: %r' % self.state
292 292 raise ProcessStateError(s)
293 293
294 294 def stop(self):
295 295 return self.interrupt_then_kill()
296 296
297 297 def signal(self, sig):
298 298 if self.state == 'running':
299 299 if WINDOWS and sig != SIGINT:
300 300 # use Windows tree-kill for better child cleanup
301 301 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
302 302 else:
303 303 self.process.send_signal(sig)
304 304
305 305 def interrupt_then_kill(self, delay=2.0):
306 306 """Send INT, wait a delay and then send KILL."""
307 307 try:
308 308 self.signal(SIGINT)
309 309 except Exception:
310 310 self.log.debug("interrupt failed")
311 311 pass
312 312 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
313 313 self.killer.start()
314 314
315 315 # callbacks, etc:
316 316
317 317 def handle_stdout(self, fd, events):
318 318 if WINDOWS:
319 319 line = self.stdout.recv()
320 320 else:
321 321 line = self.process.stdout.readline()
322 322 # a stopped process will be readable but return empty strings
323 323 if line:
324 324 self.log.debug(line[:-1])
325 325 else:
326 326 self.poll()
327 327
328 328 def handle_stderr(self, fd, events):
329 329 if WINDOWS:
330 330 line = self.stderr.recv()
331 331 else:
332 332 line = self.process.stderr.readline()
333 333 # a stopped process will be readable but return empty strings
334 334 if line:
335 335 self.log.debug(line[:-1])
336 336 else:
337 337 self.poll()
338 338
339 339 def poll(self):
340 340 status = self.process.poll()
341 341 if status is not None:
342 342 self.poller.stop()
343 343 self.loop.remove_handler(self.stdout)
344 344 self.loop.remove_handler(self.stderr)
345 345 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
346 346 return status
347 347
348 348 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
349 349 """Launch a controller as a regular external process."""
350 350
351 351 def find_args(self):
352 352 return self.controller_cmd + self.cluster_args + self.controller_args
353 353
354 354 def start(self):
355 355 """Start the controller by profile_dir."""
356 356 return super(LocalControllerLauncher, self).start()
357 357
358 358
359 359 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
360 360 """Launch a single engine as a regular externall process."""
361 361
362 362 def find_args(self):
363 363 return self.engine_cmd + self.cluster_args + self.engine_args
364 364
365 365
366 366 class LocalEngineSetLauncher(LocalEngineLauncher):
367 367 """Launch a set of engines as regular external processes."""
368 368
369 369 delay = CFloat(0.1, config=True,
370 370 help="""delay (in seconds) between starting each engine after the first.
371 371 This can help force the engines to get their ids in order, or limit
372 372 process flood when starting many engines."""
373 373 )
374 374
375 375 # launcher class
376 376 launcher_class = LocalEngineLauncher
377 377
378 378 launchers = Dict()
379 379 stop_data = Dict()
380 380
381 381 def __init__(self, work_dir=u'.', config=None, **kwargs):
382 382 super(LocalEngineSetLauncher, self).__init__(
383 383 work_dir=work_dir, config=config, **kwargs
384 384 )
385 385 self.stop_data = {}
386 386
387 387 def start(self, n):
388 388 """Start n engines by profile or profile_dir."""
389 389 dlist = []
390 390 for i in range(n):
391 391 if i > 0:
392 392 time.sleep(self.delay)
393 393 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
394 394 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
395 395 )
396 396
397 397 # Copy the engine args over to each engine launcher.
398 398 el.engine_cmd = copy.deepcopy(self.engine_cmd)
399 399 el.engine_args = copy.deepcopy(self.engine_args)
400 400 el.on_stop(self._notice_engine_stopped)
401 401 d = el.start()
402 402 self.launchers[i] = el
403 403 dlist.append(d)
404 404 self.notify_start(dlist)
405 405 return dlist
406 406
407 407 def find_args(self):
408 408 return ['engine set']
409 409
410 410 def signal(self, sig):
411 411 dlist = []
412 412 for el in itervalues(self.launchers):
413 413 d = el.signal(sig)
414 414 dlist.append(d)
415 415 return dlist
416 416
417 417 def interrupt_then_kill(self, delay=1.0):
418 418 dlist = []
419 419 for el in itervalues(self.launchers):
420 420 d = el.interrupt_then_kill(delay)
421 421 dlist.append(d)
422 422 return dlist
423 423
424 424 def stop(self):
425 425 return self.interrupt_then_kill()
426 426
427 427 def _notice_engine_stopped(self, data):
428 428 pid = data['pid']
429 429 for idx,el in iteritems(self.launchers):
430 430 if el.process.pid == pid:
431 431 break
432 432 self.launchers.pop(idx)
433 433 self.stop_data[idx] = data
434 434 if not self.launchers:
435 435 self.notify_stop(self.stop_data)
436 436
437 437
438 438 #-----------------------------------------------------------------------------
439 439 # MPI launchers
440 440 #-----------------------------------------------------------------------------
441 441
442 442
443 443 class MPILauncher(LocalProcessLauncher):
444 444 """Launch an external process using mpiexec."""
445 445
446 446 mpi_cmd = List(['mpiexec'], config=True,
447 447 help="The mpiexec command to use in starting the process."
448 448 )
449 449 mpi_args = List([], config=True,
450 450 help="The command line arguments to pass to mpiexec."
451 451 )
452 452 program = List(['date'],
453 453 help="The program to start via mpiexec.")
454 454 program_args = List([],
455 455 help="The command line argument to the program."
456 456 )
457 457 n = Integer(1)
458 458
459 459 def __init__(self, *args, **kwargs):
460 460 # deprecation for old MPIExec names:
461 461 config = kwargs.get('config', {})
462 462 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
463 463 deprecated = config.get(oldname)
464 464 if deprecated:
465 465 newname = oldname.replace('MPIExec', 'MPI')
466 466 config[newname].update(deprecated)
467 467 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
468 468
469 469 super(MPILauncher, self).__init__(*args, **kwargs)
470 470
471 471 def find_args(self):
472 472 """Build self.args using all the fields."""
473 473 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
474 474 self.program + self.program_args
475 475
476 476 def start(self, n):
477 477 """Start n instances of the program using mpiexec."""
478 478 self.n = n
479 479 return super(MPILauncher, self).start()
480 480
481 481
482 482 class MPIControllerLauncher(MPILauncher, ControllerMixin):
483 483 """Launch a controller using mpiexec."""
484 484
485 485 # alias back to *non-configurable* program[_args] for use in find_args()
486 486 # this way all Controller/EngineSetLaunchers have the same form, rather
487 487 # than *some* having `program_args` and others `controller_args`
488 488 @property
489 489 def program(self):
490 490 return self.controller_cmd
491 491
492 492 @property
493 493 def program_args(self):
494 494 return self.cluster_args + self.controller_args
495 495
496 496 def start(self):
497 497 """Start the controller by profile_dir."""
498 498 return super(MPIControllerLauncher, self).start(1)
499 499
500 500
501 501 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
502 502 """Launch engines using mpiexec"""
503 503
504 504 # alias back to *non-configurable* program[_args] for use in find_args()
505 505 # this way all Controller/EngineSetLaunchers have the same form, rather
506 506 # than *some* having `program_args` and others `controller_args`
507 507 @property
508 508 def program(self):
509 509 return self.engine_cmd
510 510
511 511 @property
512 512 def program_args(self):
513 513 return self.cluster_args + self.engine_args
514 514
515 515 def start(self, n):
516 516 """Start n engines by profile or profile_dir."""
517 517 self.n = n
518 518 return super(MPIEngineSetLauncher, self).start(n)
519 519
520 520 # deprecated MPIExec names
521 521 class DeprecatedMPILauncher(object):
522 522 def warn(self):
523 523 oldname = self.__class__.__name__
524 524 newname = oldname.replace('MPIExec', 'MPI')
525 525 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
526 526
527 527 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
528 528 """Deprecated, use MPILauncher"""
529 529 def __init__(self, *args, **kwargs):
530 530 super(MPIExecLauncher, self).__init__(*args, **kwargs)
531 531 self.warn()
532 532
533 533 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
534 534 """Deprecated, use MPIControllerLauncher"""
535 535 def __init__(self, *args, **kwargs):
536 536 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
537 537 self.warn()
538 538
539 539 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
540 540 """Deprecated, use MPIEngineSetLauncher"""
541 541 def __init__(self, *args, **kwargs):
542 542 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
543 543 self.warn()
544 544
545 545
546 546 #-----------------------------------------------------------------------------
547 547 # SSH launchers
548 548 #-----------------------------------------------------------------------------
549 549
550 550 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
551 551
552 552 class SSHLauncher(LocalProcessLauncher):
553 553 """A minimal launcher for ssh.
554 554
555 555 To be useful this will probably have to be extended to use the ``sshx``
556 556 idea for environment variables. There could be other things this needs
557 557 as well.
558 558 """
559 559
560 560 ssh_cmd = List(['ssh'], config=True,
561 561 help="command for starting ssh")
562 562 ssh_args = List(['-tt'], config=True,
563 563 help="args to pass to ssh")
564 564 scp_cmd = List(['scp'], config=True,
565 565 help="command for sending files")
566 566 program = List(['date'],
567 567 help="Program to launch via ssh")
568 568 program_args = List([],
569 569 help="args to pass to remote program")
570 570 hostname = Unicode('', config=True,
571 571 help="hostname on which to launch the program")
572 572 user = Unicode('', config=True,
573 573 help="username for ssh")
574 574 location = Unicode('', config=True,
575 575 help="user@hostname location for ssh in one setting")
576 576 to_fetch = List([], config=True,
577 577 help="List of (remote, local) files to fetch after starting")
578 578 to_send = List([], config=True,
579 579 help="List of (local, remote) files to send before starting")
580 580
581 581 def _hostname_changed(self, name, old, new):
582 582 if self.user:
583 583 self.location = u'%s@%s' % (self.user, new)
584 584 else:
585 585 self.location = new
586 586
587 587 def _user_changed(self, name, old, new):
588 588 self.location = u'%s@%s' % (new, self.hostname)
589 589
590 590 def find_args(self):
591 591 return self.ssh_cmd + self.ssh_args + [self.location] + \
592 592 list(map(pipes.quote, self.program + self.program_args))
593 593
594 594 def _send_file(self, local, remote):
595 595 """send a single file"""
596 596 remote = "%s:%s" % (self.location, remote)
597 597 for i in range(10):
598 598 if not os.path.exists(local):
599 599 self.log.debug("waiting for %s" % local)
600 600 time.sleep(1)
601 601 else:
602 602 break
603 remote_dir = os.path.dirname(remote)
604 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir)
605 check_output(self.ssh_cmd + self.ssh_args + \
606 [self.location, 'mkdir', '-p', '--', remote_dir]
607 )
603 608 self.log.info("sending %s to %s", local, remote)
604 609 check_output(self.scp_cmd + [local, remote])
605 610
606 611 def send_files(self):
607 612 """send our files (called before start)"""
608 613 if not self.to_send:
609 614 return
610 615 for local_file, remote_file in self.to_send:
611 616 self._send_file(local_file, remote_file)
612 617
613 618 def _fetch_file(self, remote, local):
614 619 """fetch a single file"""
615 620 full_remote = "%s:%s" % (self.location, remote)
616 621 self.log.info("fetching %s from %s", local, full_remote)
617 622 for i in range(10):
618 623 # wait up to 10s for remote file to exist
619 624 check = check_output(self.ssh_cmd + self.ssh_args + \
620 625 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
621 626 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
622 627 if check == u'no':
623 628 time.sleep(1)
624 629 elif check == u'yes':
625 630 break
631 local_dir = os.path.dirname(local)
632 if not os.path.exists(local_dir):
633 os.makedirs(local_dir, 775)
626 634 check_output(self.scp_cmd + [full_remote, local])
627 635
628 636 def fetch_files(self):
629 637 """fetch remote files (called after start)"""
630 638 if not self.to_fetch:
631 639 return
632 640 for remote_file, local_file in self.to_fetch:
633 641 self._fetch_file(remote_file, local_file)
634 642
635 643 def start(self, hostname=None, user=None):
636 644 if hostname is not None:
637 645 self.hostname = hostname
638 646 if user is not None:
639 647 self.user = user
640 648
641 649 self.send_files()
642 650 super(SSHLauncher, self).start()
643 651 self.fetch_files()
644 652
645 653 def signal(self, sig):
646 654 if self.state == 'running':
647 655 # send escaped ssh connection-closer
648 656 self.process.stdin.write('~.')
649 657 self.process.stdin.flush()
650 658
651 659 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
652 660
653 661 remote_profile_dir = Unicode('', config=True,
654 662 help="""The remote profile_dir to use.
655 663
656 664 If not specified, use calling profile, stripping out possible leading homedir.
657 665 """)
658 666
659 667 def _profile_dir_changed(self, name, old, new):
660 668 if not self.remote_profile_dir:
661 669 # trigger remote_profile_dir_default logic again,
662 670 # in case it was already triggered before profile_dir was set
663 671 self.remote_profile_dir = self._strip_home(new)
664 672
665 673 @staticmethod
666 674 def _strip_home(path):
667 675 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
668 676 home = get_home_dir()
669 677 if not home.endswith('/'):
670 678 home = home+'/'
671 679
672 680 if path.startswith(home):
673 681 return path[len(home):]
674 682 else:
675 683 return path
676 684
677 685 def _remote_profile_dir_default(self):
678 686 return self._strip_home(self.profile_dir)
679 687
680 688 def _cluster_id_changed(self, name, old, new):
681 689 if new:
682 690 raise ValueError("cluster id not supported by SSH launchers")
683 691
684 692 @property
685 693 def cluster_args(self):
686 694 return ['--profile-dir', self.remote_profile_dir]
687 695
688 696 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
689 697
690 698 # alias back to *non-configurable* program[_args] for use in find_args()
691 699 # this way all Controller/EngineSetLaunchers have the same form, rather
692 700 # than *some* having `program_args` and others `controller_args`
693 701
694 702 def _controller_cmd_default(self):
695 703 return ['ipcontroller']
696 704
697 705 @property
698 706 def program(self):
699 707 return self.controller_cmd
700 708
701 709 @property
702 710 def program_args(self):
703 711 return self.cluster_args + self.controller_args
704 712
705 713 def _to_fetch_default(self):
706 714 return [
707 715 (os.path.join(self.remote_profile_dir, 'security', cf),
708 716 os.path.join(self.profile_dir, 'security', cf),)
709 717 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
710 718 ]
711 719
712 720 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
713 721
714 722 # alias back to *non-configurable* program[_args] for use in find_args()
715 723 # this way all Controller/EngineSetLaunchers have the same form, rather
716 724 # than *some* having `program_args` and others `controller_args`
717 725
718 726 def _engine_cmd_default(self):
719 727 return ['ipengine']
720 728
721 729 @property
722 730 def program(self):
723 731 return self.engine_cmd
724 732
725 733 @property
726 734 def program_args(self):
727 735 return self.cluster_args + self.engine_args
728 736
729 737 def _to_send_default(self):
730 738 return [
731 739 (os.path.join(self.profile_dir, 'security', cf),
732 740 os.path.join(self.remote_profile_dir, 'security', cf))
733 741 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
734 742 ]
735 743
736 744
737 745 class SSHEngineSetLauncher(LocalEngineSetLauncher):
738 746 launcher_class = SSHEngineLauncher
739 747 engines = Dict(config=True,
740 748 help="""dict of engines to launch. This is a dict by hostname of ints,
741 749 corresponding to the number of engines to start on that host.""")
742 750
743 751 def _engine_cmd_default(self):
744 752 return ['ipengine']
745 753
746 754 @property
747 755 def engine_count(self):
748 756 """determine engine count from `engines` dict"""
749 757 count = 0
750 758 for n in itervalues(self.engines):
751 759 if isinstance(n, (tuple,list)):
752 760 n,args = n
753 761 count += n
754 762 return count
755 763
756 764 def start(self, n):
757 765 """Start engines by profile or profile_dir.
758 766 `n` is ignored, and the `engines` config property is used instead.
759 767 """
760 768
761 769 dlist = []
762 770 for host, n in iteritems(self.engines):
763 771 if isinstance(n, (tuple, list)):
764 772 n, args = n
765 773 else:
766 774 args = copy.deepcopy(self.engine_args)
767 775
768 776 if '@' in host:
769 777 user,host = host.split('@',1)
770 778 else:
771 779 user=None
772 780 for i in range(n):
773 781 if i > 0:
774 782 time.sleep(self.delay)
775 783 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
776 784 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
777 785 )
778 786 if i > 0:
779 787 # only send files for the first engine on each host
780 788 el.to_send = []
781 789
782 790 # Copy the engine args over to each engine launcher.
783 791 el.engine_cmd = self.engine_cmd
784 792 el.engine_args = args
785 793 el.on_stop(self._notice_engine_stopped)
786 794 d = el.start(user=user, hostname=host)
787 795 self.launchers[ "%s/%i" % (host,i) ] = el
788 796 dlist.append(d)
789 797 self.notify_start(dlist)
790 798 return dlist
791 799
792 800
793 801 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
794 802 """Launcher for calling
795 803 `ipcluster engines` on a remote machine.
796 804
797 805 Requires that remote profile is already configured.
798 806 """
799 807
800 808 n = Integer()
801 809 ipcluster_cmd = List(['ipcluster'], config=True)
802 810
803 811 @property
804 812 def program(self):
805 813 return self.ipcluster_cmd + ['engines']
806 814
807 815 @property
808 816 def program_args(self):
809 817 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
810 818
811 819 def _to_send_default(self):
812 820 return [
813 821 (os.path.join(self.profile_dir, 'security', cf),
814 822 os.path.join(self.remote_profile_dir, 'security', cf))
815 823 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
816 824 ]
817 825
818 826 def start(self, n):
819 827 self.n = n
820 828 super(SSHProxyEngineSetLauncher, self).start()
821 829
822 830
823 831 #-----------------------------------------------------------------------------
824 832 # Windows HPC Server 2008 scheduler launchers
825 833 #-----------------------------------------------------------------------------
826 834
827 835
828 836 # This is only used on Windows.
829 837 def find_job_cmd():
830 838 if WINDOWS:
831 839 try:
832 840 return find_cmd('job')
833 841 except (FindCmdError, ImportError):
834 842 # ImportError will be raised if win32api is not installed
835 843 return 'job'
836 844 else:
837 845 return 'job'
838 846
839 847
840 848 class WindowsHPCLauncher(BaseLauncher):
841 849
842 850 job_id_regexp = CRegExp(r'\d+', config=True,
843 851 help="""A regular expression used to get the job id from the output of the
844 852 submit_command. """
845 853 )
846 854 job_file_name = Unicode(u'ipython_job.xml', config=True,
847 855 help="The filename of the instantiated job script.")
848 856 # The full path to the instantiated job script. This gets made dynamically
849 857 # by combining the work_dir with the job_file_name.
850 858 job_file = Unicode(u'')
851 859 scheduler = Unicode('', config=True,
852 860 help="The hostname of the scheduler to submit the job to.")
853 861 job_cmd = Unicode(find_job_cmd(), config=True,
854 862 help="The command for submitting jobs.")
855 863
856 864 def __init__(self, work_dir=u'.', config=None, **kwargs):
857 865 super(WindowsHPCLauncher, self).__init__(
858 866 work_dir=work_dir, config=config, **kwargs
859 867 )
860 868
861 869 @property
862 870 def job_file(self):
863 871 return os.path.join(self.work_dir, self.job_file_name)
864 872
865 873 def write_job_file(self, n):
866 874 raise NotImplementedError("Implement write_job_file in a subclass.")
867 875
868 876 def find_args(self):
869 877 return [u'job.exe']
870 878
871 879 def parse_job_id(self, output):
872 880 """Take the output of the submit command and return the job id."""
873 881 m = self.job_id_regexp.search(output)
874 882 if m is not None:
875 883 job_id = m.group()
876 884 else:
877 885 raise LauncherError("Job id couldn't be determined: %s" % output)
878 886 self.job_id = job_id
879 887 self.log.info('Job started with id: %r', job_id)
880 888 return job_id
881 889
882 890 def start(self, n):
883 891 """Start n copies of the process using the Win HPC job scheduler."""
884 892 self.write_job_file(n)
885 893 args = [
886 894 'submit',
887 895 '/jobfile:%s' % self.job_file,
888 896 '/scheduler:%s' % self.scheduler
889 897 ]
890 898 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
891 899
892 900 output = check_output([self.job_cmd]+args,
893 901 env=os.environ,
894 902 cwd=self.work_dir,
895 903 stderr=STDOUT
896 904 )
897 905 output = output.decode(DEFAULT_ENCODING, 'replace')
898 906 job_id = self.parse_job_id(output)
899 907 self.notify_start(job_id)
900 908 return job_id
901 909
902 910 def stop(self):
903 911 args = [
904 912 'cancel',
905 913 self.job_id,
906 914 '/scheduler:%s' % self.scheduler
907 915 ]
908 916 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
909 917 try:
910 918 output = check_output([self.job_cmd]+args,
911 919 env=os.environ,
912 920 cwd=self.work_dir,
913 921 stderr=STDOUT
914 922 )
915 923 output = output.decode(DEFAULT_ENCODING, 'replace')
916 924 except:
917 925 output = u'The job already appears to be stopped: %r' % self.job_id
918 926 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
919 927 return output
920 928
921 929
922 930 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
923 931
924 932 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
925 933 help="WinHPC xml job file.")
926 934 controller_args = List([], config=False,
927 935 help="extra args to pass to ipcontroller")
928 936
929 937 def write_job_file(self, n):
930 938 job = IPControllerJob(parent=self)
931 939
932 940 t = IPControllerTask(parent=self)
933 941 # The tasks work directory is *not* the actual work directory of
934 942 # the controller. It is used as the base path for the stdout/stderr
935 943 # files that the scheduler redirects to.
936 944 t.work_directory = self.profile_dir
937 945 # Add the profile_dir and from self.start().
938 946 t.controller_args.extend(self.cluster_args)
939 947 t.controller_args.extend(self.controller_args)
940 948 job.add_task(t)
941 949
942 950 self.log.debug("Writing job description file: %s", self.job_file)
943 951 job.write(self.job_file)
944 952
945 953 @property
946 954 def job_file(self):
947 955 return os.path.join(self.profile_dir, self.job_file_name)
948 956
949 957 def start(self):
950 958 """Start the controller by profile_dir."""
951 959 return super(WindowsHPCControllerLauncher, self).start(1)
952 960
953 961
954 962 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
955 963
956 964 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
957 965 help="jobfile for ipengines job")
958 966 engine_args = List([], config=False,
959 967 help="extra args to pas to ipengine")
960 968
961 969 def write_job_file(self, n):
962 970 job = IPEngineSetJob(parent=self)
963 971
964 972 for i in range(n):
965 973 t = IPEngineTask(parent=self)
966 974 # The tasks work directory is *not* the actual work directory of
967 975 # the engine. It is used as the base path for the stdout/stderr
968 976 # files that the scheduler redirects to.
969 977 t.work_directory = self.profile_dir
970 978 # Add the profile_dir and from self.start().
971 979 t.engine_args.extend(self.cluster_args)
972 980 t.engine_args.extend(self.engine_args)
973 981 job.add_task(t)
974 982
975 983 self.log.debug("Writing job description file: %s", self.job_file)
976 984 job.write(self.job_file)
977 985
978 986 @property
979 987 def job_file(self):
980 988 return os.path.join(self.profile_dir, self.job_file_name)
981 989
982 990 def start(self, n):
983 991 """Start the controller by profile_dir."""
984 992 return super(WindowsHPCEngineSetLauncher, self).start(n)
985 993
986 994
987 995 #-----------------------------------------------------------------------------
988 996 # Batch (PBS) system launchers
989 997 #-----------------------------------------------------------------------------
990 998
991 999 class BatchClusterAppMixin(ClusterAppMixin):
992 1000 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
993 1001 def _profile_dir_changed(self, name, old, new):
994 1002 self.context[name] = new
995 1003 _cluster_id_changed = _profile_dir_changed
996 1004
997 1005 def _profile_dir_default(self):
998 1006 self.context['profile_dir'] = ''
999 1007 return ''
1000 1008 def _cluster_id_default(self):
1001 1009 self.context['cluster_id'] = ''
1002 1010 return ''
1003 1011
1004 1012
1005 1013 class BatchSystemLauncher(BaseLauncher):
1006 1014 """Launch an external process using a batch system.
1007 1015
1008 1016 This class is designed to work with UNIX batch systems like PBS, LSF,
1009 1017 GridEngine, etc. The overall model is that there are different commands
1010 1018 like qsub, qdel, etc. that handle the starting and stopping of the process.
1011 1019
1012 1020 This class also has the notion of a batch script. The ``batch_template``
1013 1021 attribute can be set to a string that is a template for the batch script.
1014 1022 This template is instantiated using string formatting. Thus the template can
1015 1023 use {n} fot the number of instances. Subclasses can add additional variables
1016 1024 to the template dict.
1017 1025 """
1018 1026
1019 1027 # Subclasses must fill these in. See PBSEngineSet
1020 1028 submit_command = List([''], config=True,
1021 1029 help="The name of the command line program used to submit jobs.")
1022 1030 delete_command = List([''], config=True,
1023 1031 help="The name of the command line program used to delete jobs.")
1024 1032 job_id_regexp = CRegExp('', config=True,
1025 1033 help="""A regular expression used to get the job id from the output of the
1026 1034 submit_command.""")
1027 1035 job_id_regexp_group = Integer(0, config=True,
1028 1036 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1029 1037 batch_template = Unicode('', config=True,
1030 1038 help="The string that is the batch script template itself.")
1031 1039 batch_template_file = Unicode(u'', config=True,
1032 1040 help="The file that contains the batch template.")
1033 1041 batch_file_name = Unicode(u'batch_script', config=True,
1034 1042 help="The filename of the instantiated batch script.")
1035 1043 queue = Unicode(u'', config=True,
1036 1044 help="The PBS Queue.")
1037 1045
1038 1046 def _queue_changed(self, name, old, new):
1039 1047 self.context[name] = new
1040 1048
1041 1049 n = Integer(1)
1042 1050 _n_changed = _queue_changed
1043 1051
1044 1052 # not configurable, override in subclasses
1045 1053 # PBS Job Array regex
1046 1054 job_array_regexp = CRegExp('')
1047 1055 job_array_template = Unicode('')
1048 1056 # PBS Queue regex
1049 1057 queue_regexp = CRegExp('')
1050 1058 queue_template = Unicode('')
1051 1059 # The default batch template, override in subclasses
1052 1060 default_template = Unicode('')
1053 1061 # The full path to the instantiated batch script.
1054 1062 batch_file = Unicode(u'')
1055 1063 # the format dict used with batch_template:
1056 1064 context = Dict()
1057 1065
1058 1066 def _context_default(self):
1059 1067 """load the default context with the default values for the basic keys
1060 1068
1061 1069 because the _trait_changed methods only load the context if they
1062 1070 are set to something other than the default value.
1063 1071 """
1064 1072 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1065 1073
1066 1074 # the Formatter instance for rendering the templates:
1067 1075 formatter = Instance(EvalFormatter, (), {})
1068 1076
1069 1077 def find_args(self):
1070 1078 return self.submit_command + [self.batch_file]
1071 1079
1072 1080 def __init__(self, work_dir=u'.', config=None, **kwargs):
1073 1081 super(BatchSystemLauncher, self).__init__(
1074 1082 work_dir=work_dir, config=config, **kwargs
1075 1083 )
1076 1084 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1077 1085
1078 1086 def parse_job_id(self, output):
1079 1087 """Take the output of the submit command and return the job id."""
1080 1088 m = self.job_id_regexp.search(output)
1081 1089 if m is not None:
1082 1090 job_id = m.group(self.job_id_regexp_group)
1083 1091 else:
1084 1092 raise LauncherError("Job id couldn't be determined: %s" % output)
1085 1093 self.job_id = job_id
1086 1094 self.log.info('Job submitted with job id: %r', job_id)
1087 1095 return job_id
1088 1096
1089 1097 def write_batch_script(self, n):
1090 1098 """Instantiate and write the batch script to the work_dir."""
1091 1099 self.n = n
1092 1100 # first priority is batch_template if set
1093 1101 if self.batch_template_file and not self.batch_template:
1094 1102 # second priority is batch_template_file
1095 1103 with open(self.batch_template_file) as f:
1096 1104 self.batch_template = f.read()
1097 1105 if not self.batch_template:
1098 1106 # third (last) priority is default_template
1099 1107 self.batch_template = self.default_template
1100 1108 # add jobarray or queue lines to user-specified template
1101 1109 # note that this is *only* when user did not specify a template.
1102 1110 self._insert_queue_in_script()
1103 1111 self._insert_job_array_in_script()
1104 1112 script_as_string = self.formatter.format(self.batch_template, **self.context)
1105 1113 self.log.debug('Writing batch script: %s', self.batch_file)
1106 1114 with open(self.batch_file, 'w') as f:
1107 1115 f.write(script_as_string)
1108 1116 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1109 1117
1110 1118 def _insert_queue_in_script(self):
1111 1119 """Inserts a queue if required into the batch script.
1112 1120 """
1113 1121 if self.queue and not self.queue_regexp.search(self.batch_template):
1114 1122 self.log.debug("adding PBS queue settings to batch script")
1115 1123 firstline, rest = self.batch_template.split('\n',1)
1116 1124 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1117 1125
1118 1126 def _insert_job_array_in_script(self):
1119 1127 """Inserts a job array if required into the batch script.
1120 1128 """
1121 1129 if not self.job_array_regexp.search(self.batch_template):
1122 1130 self.log.debug("adding job array settings to batch script")
1123 1131 firstline, rest = self.batch_template.split('\n',1)
1124 1132 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1125 1133
1126 1134 def start(self, n):
1127 1135 """Start n copies of the process using a batch system."""
1128 1136 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1129 1137 # Here we save profile_dir in the context so they
1130 1138 # can be used in the batch script template as {profile_dir}
1131 1139 self.write_batch_script(n)
1132 1140 output = check_output(self.args, env=os.environ)
1133 1141 output = output.decode(DEFAULT_ENCODING, 'replace')
1134 1142
1135 1143 job_id = self.parse_job_id(output)
1136 1144 self.notify_start(job_id)
1137 1145 return job_id
1138 1146
1139 1147 def stop(self):
1140 1148 try:
1141 1149 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1142 1150 stdout=PIPE, stderr=PIPE)
1143 1151 out, err = p.communicate()
1144 1152 output = out + err
1145 1153 except:
1146 1154 self.log.exception("Problem stopping cluster with command: %s" %
1147 1155 (self.delete_command + [self.job_id]))
1148 1156 output = ""
1149 1157 output = output.decode(DEFAULT_ENCODING, 'replace')
1150 1158 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1151 1159 return output
1152 1160
1153 1161
1154 1162 class PBSLauncher(BatchSystemLauncher):
1155 1163 """A BatchSystemLauncher subclass for PBS."""
1156 1164
1157 1165 submit_command = List(['qsub'], config=True,
1158 1166 help="The PBS submit command ['qsub']")
1159 1167 delete_command = List(['qdel'], config=True,
1160 1168 help="The PBS delete command ['qsub']")
1161 1169 job_id_regexp = CRegExp(r'\d+', config=True,
1162 1170 help="Regular expresion for identifying the job ID [r'\d+']")
1163 1171
1164 1172 batch_file = Unicode(u'')
1165 1173 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1166 1174 job_array_template = Unicode('#PBS -t 1-{n}')
1167 1175 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1168 1176 queue_template = Unicode('#PBS -q {queue}')
1169 1177
1170 1178
1171 1179 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1172 1180 """Launch a controller using PBS."""
1173 1181
1174 1182 batch_file_name = Unicode(u'pbs_controller', config=True,
1175 1183 help="batch file name for the controller job.")
1176 1184 default_template= Unicode("""#!/bin/sh
1177 1185 #PBS -V
1178 1186 #PBS -N ipcontroller
1179 1187 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1180 1188 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1181 1189
1182 1190 def start(self):
1183 1191 """Start the controller by profile or profile_dir."""
1184 1192 return super(PBSControllerLauncher, self).start(1)
1185 1193
1186 1194
1187 1195 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1188 1196 """Launch Engines using PBS"""
1189 1197 batch_file_name = Unicode(u'pbs_engines', config=True,
1190 1198 help="batch file name for the engine(s) job.")
1191 1199 default_template= Unicode(u"""#!/bin/sh
1192 1200 #PBS -V
1193 1201 #PBS -N ipengine
1194 1202 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1195 1203 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1196 1204
1197 1205
1198 1206 #SGE is very similar to PBS
1199 1207
1200 1208 class SGELauncher(PBSLauncher):
1201 1209 """Sun GridEngine is a PBS clone with slightly different syntax"""
1202 1210 job_array_regexp = CRegExp('#\$\W+\-t')
1203 1211 job_array_template = Unicode('#$ -t 1-{n}')
1204 1212 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1205 1213 queue_template = Unicode('#$ -q {queue}')
1206 1214
1207 1215
1208 1216 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1209 1217 """Launch a controller using SGE."""
1210 1218
1211 1219 batch_file_name = Unicode(u'sge_controller', config=True,
1212 1220 help="batch file name for the ipontroller job.")
1213 1221 default_template= Unicode(u"""#$ -V
1214 1222 #$ -S /bin/sh
1215 1223 #$ -N ipcontroller
1216 1224 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1217 1225 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1218 1226
1219 1227 def start(self):
1220 1228 """Start the controller by profile or profile_dir."""
1221 1229 return super(SGEControllerLauncher, self).start(1)
1222 1230
1223 1231
1224 1232 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1225 1233 """Launch Engines with SGE"""
1226 1234 batch_file_name = Unicode(u'sge_engines', config=True,
1227 1235 help="batch file name for the engine(s) job.")
1228 1236 default_template = Unicode("""#$ -V
1229 1237 #$ -S /bin/sh
1230 1238 #$ -N ipengine
1231 1239 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1232 1240 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1233 1241
1234 1242
1235 1243 # LSF launchers
1236 1244
1237 1245 class LSFLauncher(BatchSystemLauncher):
1238 1246 """A BatchSystemLauncher subclass for LSF."""
1239 1247
1240 1248 submit_command = List(['bsub'], config=True,
1241 1249 help="The PBS submit command ['bsub']")
1242 1250 delete_command = List(['bkill'], config=True,
1243 1251 help="The PBS delete command ['bkill']")
1244 1252 job_id_regexp = CRegExp(r'\d+', config=True,
1245 1253 help="Regular expresion for identifying the job ID [r'\d+']")
1246 1254
1247 1255 batch_file = Unicode(u'')
1248 1256 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1249 1257 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1250 1258 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1251 1259 queue_template = Unicode('#BSUB -q {queue}')
1252 1260
1253 1261 def start(self, n):
1254 1262 """Start n copies of the process using LSF batch system.
1255 1263 This cant inherit from the base class because bsub expects
1256 1264 to be piped a shell script in order to honor the #BSUB directives :
1257 1265 bsub < script
1258 1266 """
1259 1267 # Here we save profile_dir in the context so they
1260 1268 # can be used in the batch script template as {profile_dir}
1261 1269 self.write_batch_script(n)
1262 1270 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1263 1271 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1264 1272 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1265 1273 output,err = p.communicate()
1266 1274 output = output.decode(DEFAULT_ENCODING, 'replace')
1267 1275 job_id = self.parse_job_id(output)
1268 1276 self.notify_start(job_id)
1269 1277 return job_id
1270 1278
1271 1279
1272 1280 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1273 1281 """Launch a controller using LSF."""
1274 1282
1275 1283 batch_file_name = Unicode(u'lsf_controller', config=True,
1276 1284 help="batch file name for the controller job.")
1277 1285 default_template= Unicode("""#!/bin/sh
1278 1286 #BSUB -J ipcontroller
1279 1287 #BSUB -oo ipcontroller.o.%%J
1280 1288 #BSUB -eo ipcontroller.e.%%J
1281 1289 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1282 1290 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1283 1291
1284 1292 def start(self):
1285 1293 """Start the controller by profile or profile_dir."""
1286 1294 return super(LSFControllerLauncher, self).start(1)
1287 1295
1288 1296
1289 1297 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1290 1298 """Launch Engines using LSF"""
1291 1299 batch_file_name = Unicode(u'lsf_engines', config=True,
1292 1300 help="batch file name for the engine(s) job.")
1293 1301 default_template= Unicode(u"""#!/bin/sh
1294 1302 #BSUB -oo ipengine.o.%%J
1295 1303 #BSUB -eo ipengine.e.%%J
1296 1304 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1297 1305 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1298 1306
1299 1307
1300 1308
1301 1309 class HTCondorLauncher(BatchSystemLauncher):
1302 1310 """A BatchSystemLauncher subclass for HTCondor.
1303 1311
1304 1312 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1305 1313 that the python instance but otherwise is very similar to PBS. This is because
1306 1314 HTCondor destroys sys.executable when launching remote processes - a launched
1307 1315 python process depends on sys.executable to effectively evaluate its
1308 1316 module search paths. Without it, regardless of which python interpreter you launch
1309 1317 you will get the to built in module search paths.
1310 1318
1311 1319 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1312 1320 this - the mechanism of shebanged scripts means that the python binary will be
1313 1321 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1314 1322 scripts on the remote node*. This means you need to take care that:
1315 1323
1316 1324 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1317 1325 of the python environment you wish to execute code in having top precedence.
1318 1326 b. This functionality is untested on Windows.
1319 1327
1320 1328 If you need different behavior, consider making you own template.
1321 1329 """
1322 1330
1323 1331 submit_command = List(['condor_submit'], config=True,
1324 1332 help="The HTCondor submit command ['condor_submit']")
1325 1333 delete_command = List(['condor_rm'], config=True,
1326 1334 help="The HTCondor delete command ['condor_rm']")
1327 1335 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1328 1336 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1329 1337 job_id_regexp_group = Integer(1, config=True,
1330 1338 help="""The group we wish to match in job_id_regexp [1]""")
1331 1339
1332 1340 job_array_regexp = CRegExp('queue\W+\$')
1333 1341 job_array_template = Unicode('queue {n}')
1334 1342
1335 1343
1336 1344 def _insert_job_array_in_script(self):
1337 1345 """Inserts a job array if required into the batch script.
1338 1346 """
1339 1347 if not self.job_array_regexp.search(self.batch_template):
1340 1348 self.log.debug("adding job array settings to batch script")
1341 1349 #HTCondor requires that the job array goes at the bottom of the script
1342 1350 self.batch_template = '\n'.join([self.batch_template,
1343 1351 self.job_array_template])
1344 1352
1345 1353 def _insert_queue_in_script(self):
1346 1354 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1347 1355 specified in the script.
1348 1356 """
1349 1357 pass
1350 1358
1351 1359
1352 1360 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1353 1361 """Launch a controller using HTCondor."""
1354 1362
1355 1363 batch_file_name = Unicode(u'htcondor_controller', config=True,
1356 1364 help="batch file name for the controller job.")
1357 1365 default_template = Unicode(r"""
1358 1366 universe = vanilla
1359 1367 executable = ipcontroller
1360 1368 # by default we expect a shared file system
1361 1369 transfer_executable = False
1362 1370 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1363 1371 """)
1364 1372
1365 1373 def start(self):
1366 1374 """Start the controller by profile or profile_dir."""
1367 1375 return super(HTCondorControllerLauncher, self).start(1)
1368 1376
1369 1377
1370 1378 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1371 1379 """Launch Engines using HTCondor"""
1372 1380 batch_file_name = Unicode(u'htcondor_engines', config=True,
1373 1381 help="batch file name for the engine(s) job.")
1374 1382 default_template = Unicode("""
1375 1383 universe = vanilla
1376 1384 executable = ipengine
1377 1385 # by default we expect a shared file system
1378 1386 transfer_executable = False
1379 1387 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1380 1388 """)
1381 1389
1382 1390
1383 1391 #-----------------------------------------------------------------------------
1384 1392 # A launcher for ipcluster itself!
1385 1393 #-----------------------------------------------------------------------------
1386 1394
1387 1395
1388 1396 class IPClusterLauncher(LocalProcessLauncher):
1389 1397 """Launch the ipcluster program in an external process."""
1390 1398
1391 1399 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1392 1400 help="Popen command for ipcluster")
1393 1401 ipcluster_args = List(
1394 1402 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1395 1403 help="Command line arguments to pass to ipcluster.")
1396 1404 ipcluster_subcommand = Unicode('start')
1397 1405 profile = Unicode('default')
1398 1406 n = Integer(2)
1399 1407
1400 1408 def find_args(self):
1401 1409 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1402 1410 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1403 1411 self.ipcluster_args
1404 1412
1405 1413 def start(self):
1406 1414 return super(IPClusterLauncher, self).start()
1407 1415
1408 1416 #-----------------------------------------------------------------------------
1409 1417 # Collections of launchers
1410 1418 #-----------------------------------------------------------------------------
1411 1419
1412 1420 local_launchers = [
1413 1421 LocalControllerLauncher,
1414 1422 LocalEngineLauncher,
1415 1423 LocalEngineSetLauncher,
1416 1424 ]
1417 1425 mpi_launchers = [
1418 1426 MPILauncher,
1419 1427 MPIControllerLauncher,
1420 1428 MPIEngineSetLauncher,
1421 1429 ]
1422 1430 ssh_launchers = [
1423 1431 SSHLauncher,
1424 1432 SSHControllerLauncher,
1425 1433 SSHEngineLauncher,
1426 1434 SSHEngineSetLauncher,
1427 1435 SSHProxyEngineSetLauncher,
1428 1436 ]
1429 1437 winhpc_launchers = [
1430 1438 WindowsHPCLauncher,
1431 1439 WindowsHPCControllerLauncher,
1432 1440 WindowsHPCEngineSetLauncher,
1433 1441 ]
1434 1442 pbs_launchers = [
1435 1443 PBSLauncher,
1436 1444 PBSControllerLauncher,
1437 1445 PBSEngineSetLauncher,
1438 1446 ]
1439 1447 sge_launchers = [
1440 1448 SGELauncher,
1441 1449 SGEControllerLauncher,
1442 1450 SGEEngineSetLauncher,
1443 1451 ]
1444 1452 lsf_launchers = [
1445 1453 LSFLauncher,
1446 1454 LSFControllerLauncher,
1447 1455 LSFEngineSetLauncher,
1448 1456 ]
1449 1457 htcondor_launchers = [
1450 1458 HTCondorLauncher,
1451 1459 HTCondorControllerLauncher,
1452 1460 HTCondorEngineSetLauncher,
1453 1461 ]
1454 1462 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1455 1463 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
General Comments 0
You need to be logged in to leave comments. Login now