##// END OF EJS Templates
fix typo in ssh launcher send_file...
MinRK -
Show More
@@ -1,1463 +1,1463 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import pipes
26 26 import stat
27 27 import sys
28 28 import time
29 29
30 30 # signal imports, handling various platforms, versions
31 31
32 32 from signal import SIGINT, SIGTERM
33 33 try:
34 34 from signal import SIGKILL
35 35 except ImportError:
36 36 # Windows
37 37 SIGKILL=SIGTERM
38 38
39 39 try:
40 40 # Windows >= 2.7, 3.2
41 41 from signal import CTRL_C_EVENT as SIGINT
42 42 except ImportError:
43 43 pass
44 44
45 45 from subprocess import Popen, PIPE, STDOUT
46 46 try:
47 47 from subprocess import check_output
48 48 except ImportError:
49 49 # pre-2.7, define check_output with Popen
50 50 def check_output(*args, **kwargs):
51 51 kwargs.update(dict(stdout=PIPE))
52 52 p = Popen(*args, **kwargs)
53 53 out,err = p.communicate()
54 54 return out
55 55
56 56 from zmq.eventloop import ioloop
57 57
58 58 from IPython.config.application import Application
59 59 from IPython.config.configurable import LoggingConfigurable
60 60 from IPython.utils.text import EvalFormatter
61 61 from IPython.utils.traitlets import (
62 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 63 )
64 64 from IPython.utils.encoding import DEFAULT_ENCODING
65 65 from IPython.utils.path import get_home_dir
66 66 from IPython.utils.process import find_cmd, FindCmdError
67 67 from IPython.utils.py3compat import iteritems, itervalues
68 68
69 69 from .win32support import forward_read_events
70 70
71 71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
72 72
73 73 WINDOWS = os.name == 'nt'
74 74
75 75 #-----------------------------------------------------------------------------
76 76 # Paths to the kernel apps
77 77 #-----------------------------------------------------------------------------
78 78
79 79 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
80 80
81 81 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
82 82
83 83 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
84 84
85 85 if WINDOWS and sys.version_info < (3,):
86 86 # `python -m package` doesn't work on Windows Python 2,
87 87 # but `python -m module` does.
88 88 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipengineapp"]
89 89 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipcontrollerapp"]
90 90
91 91 #-----------------------------------------------------------------------------
92 92 # Base launchers and errors
93 93 #-----------------------------------------------------------------------------
94 94
95 95 class LauncherError(Exception):
96 96 pass
97 97
98 98
99 99 class ProcessStateError(LauncherError):
100 100 pass
101 101
102 102
103 103 class UnknownStatus(LauncherError):
104 104 pass
105 105
106 106
107 107 class BaseLauncher(LoggingConfigurable):
108 108 """An asbtraction for starting, stopping and signaling a process."""
109 109
110 110 # In all of the launchers, the work_dir is where child processes will be
111 111 # run. This will usually be the profile_dir, but may not be. any work_dir
112 112 # passed into the __init__ method will override the config value.
113 113 # This should not be used to set the work_dir for the actual engine
114 114 # and controller. Instead, use their own config files or the
115 115 # controller_args, engine_args attributes of the launchers to add
116 116 # the work_dir option.
117 117 work_dir = Unicode(u'.')
118 118 loop = Instance('zmq.eventloop.ioloop.IOLoop')
119 119
120 120 start_data = Any()
121 121 stop_data = Any()
122 122
123 123 def _loop_default(self):
124 124 return ioloop.IOLoop.instance()
125 125
126 126 def __init__(self, work_dir=u'.', config=None, **kwargs):
127 127 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
128 128 self.state = 'before' # can be before, running, after
129 129 self.stop_callbacks = []
130 130 self.start_data = None
131 131 self.stop_data = None
132 132
133 133 @property
134 134 def args(self):
135 135 """A list of cmd and args that will be used to start the process.
136 136
137 137 This is what is passed to :func:`spawnProcess` and the first element
138 138 will be the process name.
139 139 """
140 140 return self.find_args()
141 141
142 142 def find_args(self):
143 143 """The ``.args`` property calls this to find the args list.
144 144
145 145 Subcommand should implement this to construct the cmd and args.
146 146 """
147 147 raise NotImplementedError('find_args must be implemented in a subclass')
148 148
149 149 @property
150 150 def arg_str(self):
151 151 """The string form of the program arguments."""
152 152 return ' '.join(self.args)
153 153
154 154 @property
155 155 def running(self):
156 156 """Am I running."""
157 157 if self.state == 'running':
158 158 return True
159 159 else:
160 160 return False
161 161
162 162 def start(self):
163 163 """Start the process."""
164 164 raise NotImplementedError('start must be implemented in a subclass')
165 165
166 166 def stop(self):
167 167 """Stop the process and notify observers of stopping.
168 168
169 169 This method will return None immediately.
170 170 To observe the actual process stopping, see :meth:`on_stop`.
171 171 """
172 172 raise NotImplementedError('stop must be implemented in a subclass')
173 173
174 174 def on_stop(self, f):
175 175 """Register a callback to be called with this Launcher's stop_data
176 176 when the process actually finishes.
177 177 """
178 178 if self.state=='after':
179 179 return f(self.stop_data)
180 180 else:
181 181 self.stop_callbacks.append(f)
182 182
183 183 def notify_start(self, data):
184 184 """Call this to trigger startup actions.
185 185
186 186 This logs the process startup and sets the state to 'running'. It is
187 187 a pass-through so it can be used as a callback.
188 188 """
189 189
190 190 self.log.debug('Process %r started: %r', self.args[0], data)
191 191 self.start_data = data
192 192 self.state = 'running'
193 193 return data
194 194
195 195 def notify_stop(self, data):
196 196 """Call this to trigger process stop actions.
197 197
198 198 This logs the process stopping and sets the state to 'after'. Call
199 199 this to trigger callbacks registered via :meth:`on_stop`."""
200 200
201 201 self.log.debug('Process %r stopped: %r', self.args[0], data)
202 202 self.stop_data = data
203 203 self.state = 'after'
204 204 for i in range(len(self.stop_callbacks)):
205 205 d = self.stop_callbacks.pop()
206 206 d(data)
207 207 return data
208 208
209 209 def signal(self, sig):
210 210 """Signal the process.
211 211
212 212 Parameters
213 213 ----------
214 214 sig : str or int
215 215 'KILL', 'INT', etc., or any signal number
216 216 """
217 217 raise NotImplementedError('signal must be implemented in a subclass')
218 218
219 219 class ClusterAppMixin(HasTraits):
220 220 """MixIn for cluster args as traits"""
221 221 profile_dir=Unicode('')
222 222 cluster_id=Unicode('')
223 223
224 224 @property
225 225 def cluster_args(self):
226 226 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
227 227
228 228 class ControllerMixin(ClusterAppMixin):
229 229 controller_cmd = List(ipcontroller_cmd_argv, config=True,
230 230 help="""Popen command to launch ipcontroller.""")
231 231 # Command line arguments to ipcontroller.
232 232 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
233 233 help="""command-line args to pass to ipcontroller""")
234 234
235 235 class EngineMixin(ClusterAppMixin):
236 236 engine_cmd = List(ipengine_cmd_argv, config=True,
237 237 help="""command to launch the Engine.""")
238 238 # Command line arguments for ipengine.
239 239 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
240 240 help="command-line arguments to pass to ipengine"
241 241 )
242 242
243 243
244 244 #-----------------------------------------------------------------------------
245 245 # Local process launchers
246 246 #-----------------------------------------------------------------------------
247 247
248 248
249 249 class LocalProcessLauncher(BaseLauncher):
250 250 """Start and stop an external process in an asynchronous manner.
251 251
252 252 This will launch the external process with a working directory of
253 253 ``self.work_dir``.
254 254 """
255 255
256 256 # This is used to to construct self.args, which is passed to
257 257 # spawnProcess.
258 258 cmd_and_args = List([])
259 259 poll_frequency = Integer(100) # in ms
260 260
261 261 def __init__(self, work_dir=u'.', config=None, **kwargs):
262 262 super(LocalProcessLauncher, self).__init__(
263 263 work_dir=work_dir, config=config, **kwargs
264 264 )
265 265 self.process = None
266 266 self.poller = None
267 267
268 268 def find_args(self):
269 269 return self.cmd_and_args
270 270
271 271 def start(self):
272 272 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
273 273 if self.state == 'before':
274 274 self.process = Popen(self.args,
275 275 stdout=PIPE,stderr=PIPE,stdin=PIPE,
276 276 env=os.environ,
277 277 cwd=self.work_dir
278 278 )
279 279 if WINDOWS:
280 280 self.stdout = forward_read_events(self.process.stdout)
281 281 self.stderr = forward_read_events(self.process.stderr)
282 282 else:
283 283 self.stdout = self.process.stdout.fileno()
284 284 self.stderr = self.process.stderr.fileno()
285 285 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
286 286 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
287 287 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
288 288 self.poller.start()
289 289 self.notify_start(self.process.pid)
290 290 else:
291 291 s = 'The process was already started and has state: %r' % self.state
292 292 raise ProcessStateError(s)
293 293
294 294 def stop(self):
295 295 return self.interrupt_then_kill()
296 296
297 297 def signal(self, sig):
298 298 if self.state == 'running':
299 299 if WINDOWS and sig != SIGINT:
300 300 # use Windows tree-kill for better child cleanup
301 301 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
302 302 else:
303 303 self.process.send_signal(sig)
304 304
305 305 def interrupt_then_kill(self, delay=2.0):
306 306 """Send INT, wait a delay and then send KILL."""
307 307 try:
308 308 self.signal(SIGINT)
309 309 except Exception:
310 310 self.log.debug("interrupt failed")
311 311 pass
312 312 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
313 313 self.killer.start()
314 314
315 315 # callbacks, etc:
316 316
317 317 def handle_stdout(self, fd, events):
318 318 if WINDOWS:
319 319 line = self.stdout.recv()
320 320 else:
321 321 line = self.process.stdout.readline()
322 322 # a stopped process will be readable but return empty strings
323 323 if line:
324 324 self.log.debug(line[:-1])
325 325 else:
326 326 self.poll()
327 327
328 328 def handle_stderr(self, fd, events):
329 329 if WINDOWS:
330 330 line = self.stderr.recv()
331 331 else:
332 332 line = self.process.stderr.readline()
333 333 # a stopped process will be readable but return empty strings
334 334 if line:
335 335 self.log.debug(line[:-1])
336 336 else:
337 337 self.poll()
338 338
339 339 def poll(self):
340 340 status = self.process.poll()
341 341 if status is not None:
342 342 self.poller.stop()
343 343 self.loop.remove_handler(self.stdout)
344 344 self.loop.remove_handler(self.stderr)
345 345 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
346 346 return status
347 347
348 348 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
349 349 """Launch a controller as a regular external process."""
350 350
351 351 def find_args(self):
352 352 return self.controller_cmd + self.cluster_args + self.controller_args
353 353
354 354 def start(self):
355 355 """Start the controller by profile_dir."""
356 356 return super(LocalControllerLauncher, self).start()
357 357
358 358
359 359 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
360 360 """Launch a single engine as a regular externall process."""
361 361
362 362 def find_args(self):
363 363 return self.engine_cmd + self.cluster_args + self.engine_args
364 364
365 365
366 366 class LocalEngineSetLauncher(LocalEngineLauncher):
367 367 """Launch a set of engines as regular external processes."""
368 368
369 369 delay = CFloat(0.1, config=True,
370 370 help="""delay (in seconds) between starting each engine after the first.
371 371 This can help force the engines to get their ids in order, or limit
372 372 process flood when starting many engines."""
373 373 )
374 374
375 375 # launcher class
376 376 launcher_class = LocalEngineLauncher
377 377
378 378 launchers = Dict()
379 379 stop_data = Dict()
380 380
381 381 def __init__(self, work_dir=u'.', config=None, **kwargs):
382 382 super(LocalEngineSetLauncher, self).__init__(
383 383 work_dir=work_dir, config=config, **kwargs
384 384 )
385 385 self.stop_data = {}
386 386
387 387 def start(self, n):
388 388 """Start n engines by profile or profile_dir."""
389 389 dlist = []
390 390 for i in range(n):
391 391 if i > 0:
392 392 time.sleep(self.delay)
393 393 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
394 394 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
395 395 )
396 396
397 397 # Copy the engine args over to each engine launcher.
398 398 el.engine_cmd = copy.deepcopy(self.engine_cmd)
399 399 el.engine_args = copy.deepcopy(self.engine_args)
400 400 el.on_stop(self._notice_engine_stopped)
401 401 d = el.start()
402 402 self.launchers[i] = el
403 403 dlist.append(d)
404 404 self.notify_start(dlist)
405 405 return dlist
406 406
407 407 def find_args(self):
408 408 return ['engine set']
409 409
410 410 def signal(self, sig):
411 411 dlist = []
412 412 for el in itervalues(self.launchers):
413 413 d = el.signal(sig)
414 414 dlist.append(d)
415 415 return dlist
416 416
417 417 def interrupt_then_kill(self, delay=1.0):
418 418 dlist = []
419 419 for el in itervalues(self.launchers):
420 420 d = el.interrupt_then_kill(delay)
421 421 dlist.append(d)
422 422 return dlist
423 423
424 424 def stop(self):
425 425 return self.interrupt_then_kill()
426 426
427 427 def _notice_engine_stopped(self, data):
428 428 pid = data['pid']
429 429 for idx,el in iteritems(self.launchers):
430 430 if el.process.pid == pid:
431 431 break
432 432 self.launchers.pop(idx)
433 433 self.stop_data[idx] = data
434 434 if not self.launchers:
435 435 self.notify_stop(self.stop_data)
436 436
437 437
438 438 #-----------------------------------------------------------------------------
439 439 # MPI launchers
440 440 #-----------------------------------------------------------------------------
441 441
442 442
443 443 class MPILauncher(LocalProcessLauncher):
444 444 """Launch an external process using mpiexec."""
445 445
446 446 mpi_cmd = List(['mpiexec'], config=True,
447 447 help="The mpiexec command to use in starting the process."
448 448 )
449 449 mpi_args = List([], config=True,
450 450 help="The command line arguments to pass to mpiexec."
451 451 )
452 452 program = List(['date'],
453 453 help="The program to start via mpiexec.")
454 454 program_args = List([],
455 455 help="The command line argument to the program."
456 456 )
457 457 n = Integer(1)
458 458
459 459 def __init__(self, *args, **kwargs):
460 460 # deprecation for old MPIExec names:
461 461 config = kwargs.get('config', {})
462 462 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
463 463 deprecated = config.get(oldname)
464 464 if deprecated:
465 465 newname = oldname.replace('MPIExec', 'MPI')
466 466 config[newname].update(deprecated)
467 467 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
468 468
469 469 super(MPILauncher, self).__init__(*args, **kwargs)
470 470
471 471 def find_args(self):
472 472 """Build self.args using all the fields."""
473 473 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
474 474 self.program + self.program_args
475 475
476 476 def start(self, n):
477 477 """Start n instances of the program using mpiexec."""
478 478 self.n = n
479 479 return super(MPILauncher, self).start()
480 480
481 481
482 482 class MPIControllerLauncher(MPILauncher, ControllerMixin):
483 483 """Launch a controller using mpiexec."""
484 484
485 485 # alias back to *non-configurable* program[_args] for use in find_args()
486 486 # this way all Controller/EngineSetLaunchers have the same form, rather
487 487 # than *some* having `program_args` and others `controller_args`
488 488 @property
489 489 def program(self):
490 490 return self.controller_cmd
491 491
492 492 @property
493 493 def program_args(self):
494 494 return self.cluster_args + self.controller_args
495 495
496 496 def start(self):
497 497 """Start the controller by profile_dir."""
498 498 return super(MPIControllerLauncher, self).start(1)
499 499
500 500
501 501 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
502 502 """Launch engines using mpiexec"""
503 503
504 504 # alias back to *non-configurable* program[_args] for use in find_args()
505 505 # this way all Controller/EngineSetLaunchers have the same form, rather
506 506 # than *some* having `program_args` and others `controller_args`
507 507 @property
508 508 def program(self):
509 509 return self.engine_cmd
510 510
511 511 @property
512 512 def program_args(self):
513 513 return self.cluster_args + self.engine_args
514 514
515 515 def start(self, n):
516 516 """Start n engines by profile or profile_dir."""
517 517 self.n = n
518 518 return super(MPIEngineSetLauncher, self).start(n)
519 519
520 520 # deprecated MPIExec names
521 521 class DeprecatedMPILauncher(object):
522 522 def warn(self):
523 523 oldname = self.__class__.__name__
524 524 newname = oldname.replace('MPIExec', 'MPI')
525 525 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
526 526
527 527 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
528 528 """Deprecated, use MPILauncher"""
529 529 def __init__(self, *args, **kwargs):
530 530 super(MPIExecLauncher, self).__init__(*args, **kwargs)
531 531 self.warn()
532 532
533 533 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
534 534 """Deprecated, use MPIControllerLauncher"""
535 535 def __init__(self, *args, **kwargs):
536 536 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
537 537 self.warn()
538 538
539 539 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
540 540 """Deprecated, use MPIEngineSetLauncher"""
541 541 def __init__(self, *args, **kwargs):
542 542 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
543 543 self.warn()
544 544
545 545
546 546 #-----------------------------------------------------------------------------
547 547 # SSH launchers
548 548 #-----------------------------------------------------------------------------
549 549
550 550 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
551 551
552 552 class SSHLauncher(LocalProcessLauncher):
553 553 """A minimal launcher for ssh.
554 554
555 555 To be useful this will probably have to be extended to use the ``sshx``
556 556 idea for environment variables. There could be other things this needs
557 557 as well.
558 558 """
559 559
560 560 ssh_cmd = List(['ssh'], config=True,
561 561 help="command for starting ssh")
562 562 ssh_args = List(['-tt'], config=True,
563 563 help="args to pass to ssh")
564 564 scp_cmd = List(['scp'], config=True,
565 565 help="command for sending files")
566 566 program = List(['date'],
567 567 help="Program to launch via ssh")
568 568 program_args = List([],
569 569 help="args to pass to remote program")
570 570 hostname = Unicode('', config=True,
571 571 help="hostname on which to launch the program")
572 572 user = Unicode('', config=True,
573 573 help="username for ssh")
574 574 location = Unicode('', config=True,
575 575 help="user@hostname location for ssh in one setting")
576 576 to_fetch = List([], config=True,
577 577 help="List of (remote, local) files to fetch after starting")
578 578 to_send = List([], config=True,
579 579 help="List of (local, remote) files to send before starting")
580 580
581 581 def _hostname_changed(self, name, old, new):
582 582 if self.user:
583 583 self.location = u'%s@%s' % (self.user, new)
584 584 else:
585 585 self.location = new
586 586
587 587 def _user_changed(self, name, old, new):
588 588 self.location = u'%s@%s' % (new, self.hostname)
589 589
590 590 def find_args(self):
591 591 return self.ssh_cmd + self.ssh_args + [self.location] + \
592 592 list(map(pipes.quote, self.program + self.program_args))
593 593
594 594 def _send_file(self, local, remote):
595 595 """send a single file"""
596 remote = "%s:%s" % (self.location, remote)
596 full_remote = "%s:%s" % (self.location, remote)
597 597 for i in range(10):
598 598 if not os.path.exists(local):
599 599 self.log.debug("waiting for %s" % local)
600 600 time.sleep(1)
601 601 else:
602 602 break
603 603 remote_dir = os.path.dirname(remote)
604 604 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir)
605 605 check_output(self.ssh_cmd + self.ssh_args + \
606 606 [self.location, 'mkdir', '-p', '--', remote_dir]
607 607 )
608 self.log.info("sending %s to %s", local, remote)
609 check_output(self.scp_cmd + [local, remote])
608 self.log.info("sending %s to %s", local, full_remote)
609 check_output(self.scp_cmd + [local, full_remote])
610 610
611 611 def send_files(self):
612 612 """send our files (called before start)"""
613 613 if not self.to_send:
614 614 return
615 615 for local_file, remote_file in self.to_send:
616 616 self._send_file(local_file, remote_file)
617 617
618 618 def _fetch_file(self, remote, local):
619 619 """fetch a single file"""
620 620 full_remote = "%s:%s" % (self.location, remote)
621 621 self.log.info("fetching %s from %s", local, full_remote)
622 622 for i in range(10):
623 623 # wait up to 10s for remote file to exist
624 624 check = check_output(self.ssh_cmd + self.ssh_args + \
625 625 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
626 626 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
627 627 if check == u'no':
628 628 time.sleep(1)
629 629 elif check == u'yes':
630 630 break
631 631 local_dir = os.path.dirname(local)
632 632 if not os.path.exists(local_dir):
633 633 os.makedirs(local_dir, 775)
634 634 check_output(self.scp_cmd + [full_remote, local])
635 635
636 636 def fetch_files(self):
637 637 """fetch remote files (called after start)"""
638 638 if not self.to_fetch:
639 639 return
640 640 for remote_file, local_file in self.to_fetch:
641 641 self._fetch_file(remote_file, local_file)
642 642
643 643 def start(self, hostname=None, user=None):
644 644 if hostname is not None:
645 645 self.hostname = hostname
646 646 if user is not None:
647 647 self.user = user
648 648
649 649 self.send_files()
650 650 super(SSHLauncher, self).start()
651 651 self.fetch_files()
652 652
653 653 def signal(self, sig):
654 654 if self.state == 'running':
655 655 # send escaped ssh connection-closer
656 656 self.process.stdin.write('~.')
657 657 self.process.stdin.flush()
658 658
659 659 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
660 660
661 661 remote_profile_dir = Unicode('', config=True,
662 662 help="""The remote profile_dir to use.
663 663
664 664 If not specified, use calling profile, stripping out possible leading homedir.
665 665 """)
666 666
667 667 def _profile_dir_changed(self, name, old, new):
668 668 if not self.remote_profile_dir:
669 669 # trigger remote_profile_dir_default logic again,
670 670 # in case it was already triggered before profile_dir was set
671 671 self.remote_profile_dir = self._strip_home(new)
672 672
673 673 @staticmethod
674 674 def _strip_home(path):
675 675 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
676 676 home = get_home_dir()
677 677 if not home.endswith('/'):
678 678 home = home+'/'
679 679
680 680 if path.startswith(home):
681 681 return path[len(home):]
682 682 else:
683 683 return path
684 684
685 685 def _remote_profile_dir_default(self):
686 686 return self._strip_home(self.profile_dir)
687 687
688 688 def _cluster_id_changed(self, name, old, new):
689 689 if new:
690 690 raise ValueError("cluster id not supported by SSH launchers")
691 691
692 692 @property
693 693 def cluster_args(self):
694 694 return ['--profile-dir', self.remote_profile_dir]
695 695
696 696 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
697 697
698 698 # alias back to *non-configurable* program[_args] for use in find_args()
699 699 # this way all Controller/EngineSetLaunchers have the same form, rather
700 700 # than *some* having `program_args` and others `controller_args`
701 701
702 702 def _controller_cmd_default(self):
703 703 return ['ipcontroller']
704 704
705 705 @property
706 706 def program(self):
707 707 return self.controller_cmd
708 708
709 709 @property
710 710 def program_args(self):
711 711 return self.cluster_args + self.controller_args
712 712
713 713 def _to_fetch_default(self):
714 714 return [
715 715 (os.path.join(self.remote_profile_dir, 'security', cf),
716 716 os.path.join(self.profile_dir, 'security', cf),)
717 717 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
718 718 ]
719 719
720 720 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
721 721
722 722 # alias back to *non-configurable* program[_args] for use in find_args()
723 723 # this way all Controller/EngineSetLaunchers have the same form, rather
724 724 # than *some* having `program_args` and others `controller_args`
725 725
726 726 def _engine_cmd_default(self):
727 727 return ['ipengine']
728 728
729 729 @property
730 730 def program(self):
731 731 return self.engine_cmd
732 732
733 733 @property
734 734 def program_args(self):
735 735 return self.cluster_args + self.engine_args
736 736
737 737 def _to_send_default(self):
738 738 return [
739 739 (os.path.join(self.profile_dir, 'security', cf),
740 740 os.path.join(self.remote_profile_dir, 'security', cf))
741 741 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
742 742 ]
743 743
744 744
745 745 class SSHEngineSetLauncher(LocalEngineSetLauncher):
746 746 launcher_class = SSHEngineLauncher
747 747 engines = Dict(config=True,
748 748 help="""dict of engines to launch. This is a dict by hostname of ints,
749 749 corresponding to the number of engines to start on that host.""")
750 750
751 751 def _engine_cmd_default(self):
752 752 return ['ipengine']
753 753
754 754 @property
755 755 def engine_count(self):
756 756 """determine engine count from `engines` dict"""
757 757 count = 0
758 758 for n in itervalues(self.engines):
759 759 if isinstance(n, (tuple,list)):
760 760 n,args = n
761 761 count += n
762 762 return count
763 763
764 764 def start(self, n):
765 765 """Start engines by profile or profile_dir.
766 766 `n` is ignored, and the `engines` config property is used instead.
767 767 """
768 768
769 769 dlist = []
770 770 for host, n in iteritems(self.engines):
771 771 if isinstance(n, (tuple, list)):
772 772 n, args = n
773 773 else:
774 774 args = copy.deepcopy(self.engine_args)
775 775
776 776 if '@' in host:
777 777 user,host = host.split('@',1)
778 778 else:
779 779 user=None
780 780 for i in range(n):
781 781 if i > 0:
782 782 time.sleep(self.delay)
783 783 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
784 784 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
785 785 )
786 786 if i > 0:
787 787 # only send files for the first engine on each host
788 788 el.to_send = []
789 789
790 790 # Copy the engine args over to each engine launcher.
791 791 el.engine_cmd = self.engine_cmd
792 792 el.engine_args = args
793 793 el.on_stop(self._notice_engine_stopped)
794 794 d = el.start(user=user, hostname=host)
795 795 self.launchers[ "%s/%i" % (host,i) ] = el
796 796 dlist.append(d)
797 797 self.notify_start(dlist)
798 798 return dlist
799 799
800 800
801 801 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
802 802 """Launcher for calling
803 803 `ipcluster engines` on a remote machine.
804 804
805 805 Requires that remote profile is already configured.
806 806 """
807 807
808 808 n = Integer()
809 809 ipcluster_cmd = List(['ipcluster'], config=True)
810 810
811 811 @property
812 812 def program(self):
813 813 return self.ipcluster_cmd + ['engines']
814 814
815 815 @property
816 816 def program_args(self):
817 817 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
818 818
819 819 def _to_send_default(self):
820 820 return [
821 821 (os.path.join(self.profile_dir, 'security', cf),
822 822 os.path.join(self.remote_profile_dir, 'security', cf))
823 823 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
824 824 ]
825 825
826 826 def start(self, n):
827 827 self.n = n
828 828 super(SSHProxyEngineSetLauncher, self).start()
829 829
830 830
831 831 #-----------------------------------------------------------------------------
832 832 # Windows HPC Server 2008 scheduler launchers
833 833 #-----------------------------------------------------------------------------
834 834
835 835
836 836 # This is only used on Windows.
837 837 def find_job_cmd():
838 838 if WINDOWS:
839 839 try:
840 840 return find_cmd('job')
841 841 except (FindCmdError, ImportError):
842 842 # ImportError will be raised if win32api is not installed
843 843 return 'job'
844 844 else:
845 845 return 'job'
846 846
847 847
848 848 class WindowsHPCLauncher(BaseLauncher):
849 849
850 850 job_id_regexp = CRegExp(r'\d+', config=True,
851 851 help="""A regular expression used to get the job id from the output of the
852 852 submit_command. """
853 853 )
854 854 job_file_name = Unicode(u'ipython_job.xml', config=True,
855 855 help="The filename of the instantiated job script.")
856 856 # The full path to the instantiated job script. This gets made dynamically
857 857 # by combining the work_dir with the job_file_name.
858 858 job_file = Unicode(u'')
859 859 scheduler = Unicode('', config=True,
860 860 help="The hostname of the scheduler to submit the job to.")
861 861 job_cmd = Unicode(find_job_cmd(), config=True,
862 862 help="The command for submitting jobs.")
863 863
864 864 def __init__(self, work_dir=u'.', config=None, **kwargs):
865 865 super(WindowsHPCLauncher, self).__init__(
866 866 work_dir=work_dir, config=config, **kwargs
867 867 )
868 868
869 869 @property
870 870 def job_file(self):
871 871 return os.path.join(self.work_dir, self.job_file_name)
872 872
873 873 def write_job_file(self, n):
874 874 raise NotImplementedError("Implement write_job_file in a subclass.")
875 875
876 876 def find_args(self):
877 877 return [u'job.exe']
878 878
879 879 def parse_job_id(self, output):
880 880 """Take the output of the submit command and return the job id."""
881 881 m = self.job_id_regexp.search(output)
882 882 if m is not None:
883 883 job_id = m.group()
884 884 else:
885 885 raise LauncherError("Job id couldn't be determined: %s" % output)
886 886 self.job_id = job_id
887 887 self.log.info('Job started with id: %r', job_id)
888 888 return job_id
889 889
890 890 def start(self, n):
891 891 """Start n copies of the process using the Win HPC job scheduler."""
892 892 self.write_job_file(n)
893 893 args = [
894 894 'submit',
895 895 '/jobfile:%s' % self.job_file,
896 896 '/scheduler:%s' % self.scheduler
897 897 ]
898 898 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
899 899
900 900 output = check_output([self.job_cmd]+args,
901 901 env=os.environ,
902 902 cwd=self.work_dir,
903 903 stderr=STDOUT
904 904 )
905 905 output = output.decode(DEFAULT_ENCODING, 'replace')
906 906 job_id = self.parse_job_id(output)
907 907 self.notify_start(job_id)
908 908 return job_id
909 909
910 910 def stop(self):
911 911 args = [
912 912 'cancel',
913 913 self.job_id,
914 914 '/scheduler:%s' % self.scheduler
915 915 ]
916 916 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
917 917 try:
918 918 output = check_output([self.job_cmd]+args,
919 919 env=os.environ,
920 920 cwd=self.work_dir,
921 921 stderr=STDOUT
922 922 )
923 923 output = output.decode(DEFAULT_ENCODING, 'replace')
924 924 except:
925 925 output = u'The job already appears to be stopped: %r' % self.job_id
926 926 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
927 927 return output
928 928
929 929
930 930 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
931 931
932 932 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
933 933 help="WinHPC xml job file.")
934 934 controller_args = List([], config=False,
935 935 help="extra args to pass to ipcontroller")
936 936
937 937 def write_job_file(self, n):
938 938 job = IPControllerJob(parent=self)
939 939
940 940 t = IPControllerTask(parent=self)
941 941 # The tasks work directory is *not* the actual work directory of
942 942 # the controller. It is used as the base path for the stdout/stderr
943 943 # files that the scheduler redirects to.
944 944 t.work_directory = self.profile_dir
945 945 # Add the profile_dir and from self.start().
946 946 t.controller_args.extend(self.cluster_args)
947 947 t.controller_args.extend(self.controller_args)
948 948 job.add_task(t)
949 949
950 950 self.log.debug("Writing job description file: %s", self.job_file)
951 951 job.write(self.job_file)
952 952
953 953 @property
954 954 def job_file(self):
955 955 return os.path.join(self.profile_dir, self.job_file_name)
956 956
957 957 def start(self):
958 958 """Start the controller by profile_dir."""
959 959 return super(WindowsHPCControllerLauncher, self).start(1)
960 960
961 961
962 962 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
963 963
964 964 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
965 965 help="jobfile for ipengines job")
966 966 engine_args = List([], config=False,
967 967 help="extra args to pas to ipengine")
968 968
969 969 def write_job_file(self, n):
970 970 job = IPEngineSetJob(parent=self)
971 971
972 972 for i in range(n):
973 973 t = IPEngineTask(parent=self)
974 974 # The tasks work directory is *not* the actual work directory of
975 975 # the engine. It is used as the base path for the stdout/stderr
976 976 # files that the scheduler redirects to.
977 977 t.work_directory = self.profile_dir
978 978 # Add the profile_dir and from self.start().
979 979 t.engine_args.extend(self.cluster_args)
980 980 t.engine_args.extend(self.engine_args)
981 981 job.add_task(t)
982 982
983 983 self.log.debug("Writing job description file: %s", self.job_file)
984 984 job.write(self.job_file)
985 985
986 986 @property
987 987 def job_file(self):
988 988 return os.path.join(self.profile_dir, self.job_file_name)
989 989
990 990 def start(self, n):
991 991 """Start the controller by profile_dir."""
992 992 return super(WindowsHPCEngineSetLauncher, self).start(n)
993 993
994 994
995 995 #-----------------------------------------------------------------------------
996 996 # Batch (PBS) system launchers
997 997 #-----------------------------------------------------------------------------
998 998
999 999 class BatchClusterAppMixin(ClusterAppMixin):
1000 1000 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1001 1001 def _profile_dir_changed(self, name, old, new):
1002 1002 self.context[name] = new
1003 1003 _cluster_id_changed = _profile_dir_changed
1004 1004
1005 1005 def _profile_dir_default(self):
1006 1006 self.context['profile_dir'] = ''
1007 1007 return ''
1008 1008 def _cluster_id_default(self):
1009 1009 self.context['cluster_id'] = ''
1010 1010 return ''
1011 1011
1012 1012
1013 1013 class BatchSystemLauncher(BaseLauncher):
1014 1014 """Launch an external process using a batch system.
1015 1015
1016 1016 This class is designed to work with UNIX batch systems like PBS, LSF,
1017 1017 GridEngine, etc. The overall model is that there are different commands
1018 1018 like qsub, qdel, etc. that handle the starting and stopping of the process.
1019 1019
1020 1020 This class also has the notion of a batch script. The ``batch_template``
1021 1021 attribute can be set to a string that is a template for the batch script.
1022 1022 This template is instantiated using string formatting. Thus the template can
1023 1023 use {n} fot the number of instances. Subclasses can add additional variables
1024 1024 to the template dict.
1025 1025 """
1026 1026
1027 1027 # Subclasses must fill these in. See PBSEngineSet
1028 1028 submit_command = List([''], config=True,
1029 1029 help="The name of the command line program used to submit jobs.")
1030 1030 delete_command = List([''], config=True,
1031 1031 help="The name of the command line program used to delete jobs.")
1032 1032 job_id_regexp = CRegExp('', config=True,
1033 1033 help="""A regular expression used to get the job id from the output of the
1034 1034 submit_command.""")
1035 1035 job_id_regexp_group = Integer(0, config=True,
1036 1036 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1037 1037 batch_template = Unicode('', config=True,
1038 1038 help="The string that is the batch script template itself.")
1039 1039 batch_template_file = Unicode(u'', config=True,
1040 1040 help="The file that contains the batch template.")
1041 1041 batch_file_name = Unicode(u'batch_script', config=True,
1042 1042 help="The filename of the instantiated batch script.")
1043 1043 queue = Unicode(u'', config=True,
1044 1044 help="The PBS Queue.")
1045 1045
1046 1046 def _queue_changed(self, name, old, new):
1047 1047 self.context[name] = new
1048 1048
1049 1049 n = Integer(1)
1050 1050 _n_changed = _queue_changed
1051 1051
1052 1052 # not configurable, override in subclasses
1053 1053 # PBS Job Array regex
1054 1054 job_array_regexp = CRegExp('')
1055 1055 job_array_template = Unicode('')
1056 1056 # PBS Queue regex
1057 1057 queue_regexp = CRegExp('')
1058 1058 queue_template = Unicode('')
1059 1059 # The default batch template, override in subclasses
1060 1060 default_template = Unicode('')
1061 1061 # The full path to the instantiated batch script.
1062 1062 batch_file = Unicode(u'')
1063 1063 # the format dict used with batch_template:
1064 1064 context = Dict()
1065 1065
1066 1066 def _context_default(self):
1067 1067 """load the default context with the default values for the basic keys
1068 1068
1069 1069 because the _trait_changed methods only load the context if they
1070 1070 are set to something other than the default value.
1071 1071 """
1072 1072 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1073 1073
1074 1074 # the Formatter instance for rendering the templates:
1075 1075 formatter = Instance(EvalFormatter, (), {})
1076 1076
1077 1077 def find_args(self):
1078 1078 return self.submit_command + [self.batch_file]
1079 1079
1080 1080 def __init__(self, work_dir=u'.', config=None, **kwargs):
1081 1081 super(BatchSystemLauncher, self).__init__(
1082 1082 work_dir=work_dir, config=config, **kwargs
1083 1083 )
1084 1084 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1085 1085
1086 1086 def parse_job_id(self, output):
1087 1087 """Take the output of the submit command and return the job id."""
1088 1088 m = self.job_id_regexp.search(output)
1089 1089 if m is not None:
1090 1090 job_id = m.group(self.job_id_regexp_group)
1091 1091 else:
1092 1092 raise LauncherError("Job id couldn't be determined: %s" % output)
1093 1093 self.job_id = job_id
1094 1094 self.log.info('Job submitted with job id: %r', job_id)
1095 1095 return job_id
1096 1096
1097 1097 def write_batch_script(self, n):
1098 1098 """Instantiate and write the batch script to the work_dir."""
1099 1099 self.n = n
1100 1100 # first priority is batch_template if set
1101 1101 if self.batch_template_file and not self.batch_template:
1102 1102 # second priority is batch_template_file
1103 1103 with open(self.batch_template_file) as f:
1104 1104 self.batch_template = f.read()
1105 1105 if not self.batch_template:
1106 1106 # third (last) priority is default_template
1107 1107 self.batch_template = self.default_template
1108 1108 # add jobarray or queue lines to user-specified template
1109 1109 # note that this is *only* when user did not specify a template.
1110 1110 self._insert_queue_in_script()
1111 1111 self._insert_job_array_in_script()
1112 1112 script_as_string = self.formatter.format(self.batch_template, **self.context)
1113 1113 self.log.debug('Writing batch script: %s', self.batch_file)
1114 1114 with open(self.batch_file, 'w') as f:
1115 1115 f.write(script_as_string)
1116 1116 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1117 1117
1118 1118 def _insert_queue_in_script(self):
1119 1119 """Inserts a queue if required into the batch script.
1120 1120 """
1121 1121 if self.queue and not self.queue_regexp.search(self.batch_template):
1122 1122 self.log.debug("adding PBS queue settings to batch script")
1123 1123 firstline, rest = self.batch_template.split('\n',1)
1124 1124 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1125 1125
1126 1126 def _insert_job_array_in_script(self):
1127 1127 """Inserts a job array if required into the batch script.
1128 1128 """
1129 1129 if not self.job_array_regexp.search(self.batch_template):
1130 1130 self.log.debug("adding job array settings to batch script")
1131 1131 firstline, rest = self.batch_template.split('\n',1)
1132 1132 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1133 1133
1134 1134 def start(self, n):
1135 1135 """Start n copies of the process using a batch system."""
1136 1136 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1137 1137 # Here we save profile_dir in the context so they
1138 1138 # can be used in the batch script template as {profile_dir}
1139 1139 self.write_batch_script(n)
1140 1140 output = check_output(self.args, env=os.environ)
1141 1141 output = output.decode(DEFAULT_ENCODING, 'replace')
1142 1142
1143 1143 job_id = self.parse_job_id(output)
1144 1144 self.notify_start(job_id)
1145 1145 return job_id
1146 1146
1147 1147 def stop(self):
1148 1148 try:
1149 1149 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1150 1150 stdout=PIPE, stderr=PIPE)
1151 1151 out, err = p.communicate()
1152 1152 output = out + err
1153 1153 except:
1154 1154 self.log.exception("Problem stopping cluster with command: %s" %
1155 1155 (self.delete_command + [self.job_id]))
1156 1156 output = ""
1157 1157 output = output.decode(DEFAULT_ENCODING, 'replace')
1158 1158 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1159 1159 return output
1160 1160
1161 1161
1162 1162 class PBSLauncher(BatchSystemLauncher):
1163 1163 """A BatchSystemLauncher subclass for PBS."""
1164 1164
1165 1165 submit_command = List(['qsub'], config=True,
1166 1166 help="The PBS submit command ['qsub']")
1167 1167 delete_command = List(['qdel'], config=True,
1168 1168 help="The PBS delete command ['qsub']")
1169 1169 job_id_regexp = CRegExp(r'\d+', config=True,
1170 1170 help="Regular expresion for identifying the job ID [r'\d+']")
1171 1171
1172 1172 batch_file = Unicode(u'')
1173 1173 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1174 1174 job_array_template = Unicode('#PBS -t 1-{n}')
1175 1175 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1176 1176 queue_template = Unicode('#PBS -q {queue}')
1177 1177
1178 1178
1179 1179 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1180 1180 """Launch a controller using PBS."""
1181 1181
1182 1182 batch_file_name = Unicode(u'pbs_controller', config=True,
1183 1183 help="batch file name for the controller job.")
1184 1184 default_template= Unicode("""#!/bin/sh
1185 1185 #PBS -V
1186 1186 #PBS -N ipcontroller
1187 1187 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1188 1188 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1189 1189
1190 1190 def start(self):
1191 1191 """Start the controller by profile or profile_dir."""
1192 1192 return super(PBSControllerLauncher, self).start(1)
1193 1193
1194 1194
1195 1195 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1196 1196 """Launch Engines using PBS"""
1197 1197 batch_file_name = Unicode(u'pbs_engines', config=True,
1198 1198 help="batch file name for the engine(s) job.")
1199 1199 default_template= Unicode(u"""#!/bin/sh
1200 1200 #PBS -V
1201 1201 #PBS -N ipengine
1202 1202 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1203 1203 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1204 1204
1205 1205
1206 1206 #SGE is very similar to PBS
1207 1207
1208 1208 class SGELauncher(PBSLauncher):
1209 1209 """Sun GridEngine is a PBS clone with slightly different syntax"""
1210 1210 job_array_regexp = CRegExp('#\$\W+\-t')
1211 1211 job_array_template = Unicode('#$ -t 1-{n}')
1212 1212 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1213 1213 queue_template = Unicode('#$ -q {queue}')
1214 1214
1215 1215
1216 1216 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1217 1217 """Launch a controller using SGE."""
1218 1218
1219 1219 batch_file_name = Unicode(u'sge_controller', config=True,
1220 1220 help="batch file name for the ipontroller job.")
1221 1221 default_template= Unicode(u"""#$ -V
1222 1222 #$ -S /bin/sh
1223 1223 #$ -N ipcontroller
1224 1224 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1225 1225 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1226 1226
1227 1227 def start(self):
1228 1228 """Start the controller by profile or profile_dir."""
1229 1229 return super(SGEControllerLauncher, self).start(1)
1230 1230
1231 1231
1232 1232 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1233 1233 """Launch Engines with SGE"""
1234 1234 batch_file_name = Unicode(u'sge_engines', config=True,
1235 1235 help="batch file name for the engine(s) job.")
1236 1236 default_template = Unicode("""#$ -V
1237 1237 #$ -S /bin/sh
1238 1238 #$ -N ipengine
1239 1239 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1240 1240 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1241 1241
1242 1242
1243 1243 # LSF launchers
1244 1244
1245 1245 class LSFLauncher(BatchSystemLauncher):
1246 1246 """A BatchSystemLauncher subclass for LSF."""
1247 1247
1248 1248 submit_command = List(['bsub'], config=True,
1249 1249 help="The PBS submit command ['bsub']")
1250 1250 delete_command = List(['bkill'], config=True,
1251 1251 help="The PBS delete command ['bkill']")
1252 1252 job_id_regexp = CRegExp(r'\d+', config=True,
1253 1253 help="Regular expresion for identifying the job ID [r'\d+']")
1254 1254
1255 1255 batch_file = Unicode(u'')
1256 1256 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1257 1257 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1258 1258 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1259 1259 queue_template = Unicode('#BSUB -q {queue}')
1260 1260
1261 1261 def start(self, n):
1262 1262 """Start n copies of the process using LSF batch system.
1263 1263 This cant inherit from the base class because bsub expects
1264 1264 to be piped a shell script in order to honor the #BSUB directives :
1265 1265 bsub < script
1266 1266 """
1267 1267 # Here we save profile_dir in the context so they
1268 1268 # can be used in the batch script template as {profile_dir}
1269 1269 self.write_batch_script(n)
1270 1270 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1271 1271 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1272 1272 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1273 1273 output,err = p.communicate()
1274 1274 output = output.decode(DEFAULT_ENCODING, 'replace')
1275 1275 job_id = self.parse_job_id(output)
1276 1276 self.notify_start(job_id)
1277 1277 return job_id
1278 1278
1279 1279
1280 1280 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1281 1281 """Launch a controller using LSF."""
1282 1282
1283 1283 batch_file_name = Unicode(u'lsf_controller', config=True,
1284 1284 help="batch file name for the controller job.")
1285 1285 default_template= Unicode("""#!/bin/sh
1286 1286 #BSUB -J ipcontroller
1287 1287 #BSUB -oo ipcontroller.o.%%J
1288 1288 #BSUB -eo ipcontroller.e.%%J
1289 1289 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1290 1290 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1291 1291
1292 1292 def start(self):
1293 1293 """Start the controller by profile or profile_dir."""
1294 1294 return super(LSFControllerLauncher, self).start(1)
1295 1295
1296 1296
1297 1297 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1298 1298 """Launch Engines using LSF"""
1299 1299 batch_file_name = Unicode(u'lsf_engines', config=True,
1300 1300 help="batch file name for the engine(s) job.")
1301 1301 default_template= Unicode(u"""#!/bin/sh
1302 1302 #BSUB -oo ipengine.o.%%J
1303 1303 #BSUB -eo ipengine.e.%%J
1304 1304 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1305 1305 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1306 1306
1307 1307
1308 1308
1309 1309 class HTCondorLauncher(BatchSystemLauncher):
1310 1310 """A BatchSystemLauncher subclass for HTCondor.
1311 1311
1312 1312 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1313 1313 that the python instance but otherwise is very similar to PBS. This is because
1314 1314 HTCondor destroys sys.executable when launching remote processes - a launched
1315 1315 python process depends on sys.executable to effectively evaluate its
1316 1316 module search paths. Without it, regardless of which python interpreter you launch
1317 1317 you will get the to built in module search paths.
1318 1318
1319 1319 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1320 1320 this - the mechanism of shebanged scripts means that the python binary will be
1321 1321 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1322 1322 scripts on the remote node*. This means you need to take care that:
1323 1323
1324 1324 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1325 1325 of the python environment you wish to execute code in having top precedence.
1326 1326 b. This functionality is untested on Windows.
1327 1327
1328 1328 If you need different behavior, consider making you own template.
1329 1329 """
1330 1330
1331 1331 submit_command = List(['condor_submit'], config=True,
1332 1332 help="The HTCondor submit command ['condor_submit']")
1333 1333 delete_command = List(['condor_rm'], config=True,
1334 1334 help="The HTCondor delete command ['condor_rm']")
1335 1335 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1336 1336 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1337 1337 job_id_regexp_group = Integer(1, config=True,
1338 1338 help="""The group we wish to match in job_id_regexp [1]""")
1339 1339
1340 1340 job_array_regexp = CRegExp('queue\W+\$')
1341 1341 job_array_template = Unicode('queue {n}')
1342 1342
1343 1343
1344 1344 def _insert_job_array_in_script(self):
1345 1345 """Inserts a job array if required into the batch script.
1346 1346 """
1347 1347 if not self.job_array_regexp.search(self.batch_template):
1348 1348 self.log.debug("adding job array settings to batch script")
1349 1349 #HTCondor requires that the job array goes at the bottom of the script
1350 1350 self.batch_template = '\n'.join([self.batch_template,
1351 1351 self.job_array_template])
1352 1352
1353 1353 def _insert_queue_in_script(self):
1354 1354 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1355 1355 specified in the script.
1356 1356 """
1357 1357 pass
1358 1358
1359 1359
1360 1360 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1361 1361 """Launch a controller using HTCondor."""
1362 1362
1363 1363 batch_file_name = Unicode(u'htcondor_controller', config=True,
1364 1364 help="batch file name for the controller job.")
1365 1365 default_template = Unicode(r"""
1366 1366 universe = vanilla
1367 1367 executable = ipcontroller
1368 1368 # by default we expect a shared file system
1369 1369 transfer_executable = False
1370 1370 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1371 1371 """)
1372 1372
1373 1373 def start(self):
1374 1374 """Start the controller by profile or profile_dir."""
1375 1375 return super(HTCondorControllerLauncher, self).start(1)
1376 1376
1377 1377
1378 1378 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1379 1379 """Launch Engines using HTCondor"""
1380 1380 batch_file_name = Unicode(u'htcondor_engines', config=True,
1381 1381 help="batch file name for the engine(s) job.")
1382 1382 default_template = Unicode("""
1383 1383 universe = vanilla
1384 1384 executable = ipengine
1385 1385 # by default we expect a shared file system
1386 1386 transfer_executable = False
1387 1387 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1388 1388 """)
1389 1389
1390 1390
1391 1391 #-----------------------------------------------------------------------------
1392 1392 # A launcher for ipcluster itself!
1393 1393 #-----------------------------------------------------------------------------
1394 1394
1395 1395
1396 1396 class IPClusterLauncher(LocalProcessLauncher):
1397 1397 """Launch the ipcluster program in an external process."""
1398 1398
1399 1399 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1400 1400 help="Popen command for ipcluster")
1401 1401 ipcluster_args = List(
1402 1402 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1403 1403 help="Command line arguments to pass to ipcluster.")
1404 1404 ipcluster_subcommand = Unicode('start')
1405 1405 profile = Unicode('default')
1406 1406 n = Integer(2)
1407 1407
1408 1408 def find_args(self):
1409 1409 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1410 1410 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1411 1411 self.ipcluster_args
1412 1412
1413 1413 def start(self):
1414 1414 return super(IPClusterLauncher, self).start()
1415 1415
1416 1416 #-----------------------------------------------------------------------------
1417 1417 # Collections of launchers
1418 1418 #-----------------------------------------------------------------------------
1419 1419
1420 1420 local_launchers = [
1421 1421 LocalControllerLauncher,
1422 1422 LocalEngineLauncher,
1423 1423 LocalEngineSetLauncher,
1424 1424 ]
1425 1425 mpi_launchers = [
1426 1426 MPILauncher,
1427 1427 MPIControllerLauncher,
1428 1428 MPIEngineSetLauncher,
1429 1429 ]
1430 1430 ssh_launchers = [
1431 1431 SSHLauncher,
1432 1432 SSHControllerLauncher,
1433 1433 SSHEngineLauncher,
1434 1434 SSHEngineSetLauncher,
1435 1435 SSHProxyEngineSetLauncher,
1436 1436 ]
1437 1437 winhpc_launchers = [
1438 1438 WindowsHPCLauncher,
1439 1439 WindowsHPCControllerLauncher,
1440 1440 WindowsHPCEngineSetLauncher,
1441 1441 ]
1442 1442 pbs_launchers = [
1443 1443 PBSLauncher,
1444 1444 PBSControllerLauncher,
1445 1445 PBSEngineSetLauncher,
1446 1446 ]
1447 1447 sge_launchers = [
1448 1448 SGELauncher,
1449 1449 SGEControllerLauncher,
1450 1450 SGEEngineSetLauncher,
1451 1451 ]
1452 1452 lsf_launchers = [
1453 1453 LSFLauncher,
1454 1454 LSFControllerLauncher,
1455 1455 LSFEngineSetLauncher,
1456 1456 ]
1457 1457 htcondor_launchers = [
1458 1458 HTCondorLauncher,
1459 1459 HTCondorControllerLauncher,
1460 1460 HTCondorEngineSetLauncher,
1461 1461 ]
1462 1462 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1463 1463 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
General Comments 0
You need to be logged in to leave comments. Login now