##// END OF EJS Templates
remove debug print statements
James Booth -
Show More
@@ -1,1462 +1,1460 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import pipes
26 26 import stat
27 27 import sys
28 28 import time
29 29
30 30 # signal imports, handling various platforms, versions
31 31
32 32 from signal import SIGINT, SIGTERM
33 33 try:
34 34 from signal import SIGKILL
35 35 except ImportError:
36 36 # Windows
37 37 SIGKILL=SIGTERM
38 38
39 39 try:
40 40 # Windows >= 2.7, 3.2
41 41 from signal import CTRL_C_EVENT as SIGINT
42 42 except ImportError:
43 43 pass
44 44
45 45 from subprocess import Popen, PIPE, STDOUT
46 46 try:
47 47 from subprocess import check_output
48 48 except ImportError:
49 49 # pre-2.7, define check_output with Popen
50 50 def check_output(*args, **kwargs):
51 51 kwargs.update(dict(stdout=PIPE))
52 52 p = Popen(*args, **kwargs)
53 53 out,err = p.communicate()
54 54 return out
55 55
56 56 from zmq.eventloop import ioloop
57 57
58 58 from IPython.config.application import Application
59 59 from IPython.config.configurable import LoggingConfigurable
60 60 from IPython.utils.text import EvalFormatter
61 61 from IPython.utils.traitlets import (
62 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 63 )
64 64 from IPython.utils.encoding import DEFAULT_ENCODING
65 65 from IPython.utils.path import get_home_dir
66 66 from IPython.utils.process import find_cmd, FindCmdError
67 67
68 68 from .win32support import forward_read_events
69 69
70 70 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71 71
72 72 WINDOWS = os.name == 'nt'
73 73
74 74 #-----------------------------------------------------------------------------
75 75 # Paths to the kernel apps
76 76 #-----------------------------------------------------------------------------
77 77
78 78 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
79 79
80 80 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
81 81
82 82 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
83 83
84 84 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
85 85
86 86 # HTCondor frustratingly destroys sys.executable when launching remote processes
87 87 # thus Python will default back to the system module search paths which is
88 88 # ridiculously fragile (not to mention destructive for virutalenvs).
89 89 # however, if we use the ip{cluster, engine, controller} scripts as our
90 90 # executable we circumvent this - the mechanism of shebanged scripts means that
91 91 # the python binary will be launched with argv[0] set correctly.
92 92 # This does mean that for HTCondor we require that:
93 93 # a. The python interpreter you are using is in a folder next to the ipengine,
94 94 # ipcluster and ipcontroller scripts
95 95 # b. I have no idea what the consequences are for Windows.
96 96 bin_dir = os.path.dirname(sys.executable)
97 97
98 98 condor_ipcluster_cmd_argv = os.path.join(bin_dir, 'ipcluster')
99 99
100 100 condor_ipengine_cmd_argv = os.path.join(bin_dir, 'ipengine')
101 101
102 102 condor_ipcontroller_cmd_argv = os.path.join(bin_dir, 'ipcontroller')
103 103
104 104 #-----------------------------------------------------------------------------
105 105 # Base launchers and errors
106 106 #-----------------------------------------------------------------------------
107 107
108 108 class LauncherError(Exception):
109 109 pass
110 110
111 111
112 112 class ProcessStateError(LauncherError):
113 113 pass
114 114
115 115
116 116 class UnknownStatus(LauncherError):
117 117 pass
118 118
119 119
120 120 class BaseLauncher(LoggingConfigurable):
121 121 """An asbtraction for starting, stopping and signaling a process."""
122 122
123 123 # In all of the launchers, the work_dir is where child processes will be
124 124 # run. This will usually be the profile_dir, but may not be. any work_dir
125 125 # passed into the __init__ method will override the config value.
126 126 # This should not be used to set the work_dir for the actual engine
127 127 # and controller. Instead, use their own config files or the
128 128 # controller_args, engine_args attributes of the launchers to add
129 129 # the work_dir option.
130 130 work_dir = Unicode(u'.')
131 131 loop = Instance('zmq.eventloop.ioloop.IOLoop')
132 132
133 133 start_data = Any()
134 134 stop_data = Any()
135 135
136 136 def _loop_default(self):
137 137 return ioloop.IOLoop.instance()
138 138
139 139 def __init__(self, work_dir=u'.', config=None, **kwargs):
140 140 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
141 141 self.state = 'before' # can be before, running, after
142 142 self.stop_callbacks = []
143 143 self.start_data = None
144 144 self.stop_data = None
145 145
146 146 @property
147 147 def args(self):
148 148 """A list of cmd and args that will be used to start the process.
149 149
150 150 This is what is passed to :func:`spawnProcess` and the first element
151 151 will be the process name.
152 152 """
153 153 return self.find_args()
154 154
155 155 def find_args(self):
156 156 """The ``.args`` property calls this to find the args list.
157 157
158 158 Subcommand should implement this to construct the cmd and args.
159 159 """
160 160 raise NotImplementedError('find_args must be implemented in a subclass')
161 161
162 162 @property
163 163 def arg_str(self):
164 164 """The string form of the program arguments."""
165 165 return ' '.join(self.args)
166 166
167 167 @property
168 168 def running(self):
169 169 """Am I running."""
170 170 if self.state == 'running':
171 171 return True
172 172 else:
173 173 return False
174 174
175 175 def start(self):
176 176 """Start the process."""
177 177 raise NotImplementedError('start must be implemented in a subclass')
178 178
179 179 def stop(self):
180 180 """Stop the process and notify observers of stopping.
181 181
182 182 This method will return None immediately.
183 183 To observe the actual process stopping, see :meth:`on_stop`.
184 184 """
185 185 raise NotImplementedError('stop must be implemented in a subclass')
186 186
187 187 def on_stop(self, f):
188 188 """Register a callback to be called with this Launcher's stop_data
189 189 when the process actually finishes.
190 190 """
191 191 if self.state=='after':
192 192 return f(self.stop_data)
193 193 else:
194 194 self.stop_callbacks.append(f)
195 195
196 196 def notify_start(self, data):
197 197 """Call this to trigger startup actions.
198 198
199 199 This logs the process startup and sets the state to 'running'. It is
200 200 a pass-through so it can be used as a callback.
201 201 """
202 202
203 203 self.log.debug('Process %r started: %r', self.args[0], data)
204 204 self.start_data = data
205 205 self.state = 'running'
206 206 return data
207 207
208 208 def notify_stop(self, data):
209 209 """Call this to trigger process stop actions.
210 210
211 211 This logs the process stopping and sets the state to 'after'. Call
212 212 this to trigger callbacks registered via :meth:`on_stop`."""
213 213
214 214 self.log.debug('Process %r stopped: %r', self.args[0], data)
215 215 self.stop_data = data
216 216 self.state = 'after'
217 217 for i in range(len(self.stop_callbacks)):
218 218 d = self.stop_callbacks.pop()
219 219 d(data)
220 220 return data
221 221
222 222 def signal(self, sig):
223 223 """Signal the process.
224 224
225 225 Parameters
226 226 ----------
227 227 sig : str or int
228 228 'KILL', 'INT', etc., or any signal number
229 229 """
230 230 raise NotImplementedError('signal must be implemented in a subclass')
231 231
232 232 class ClusterAppMixin(HasTraits):
233 233 """MixIn for cluster args as traits"""
234 234 profile_dir=Unicode('')
235 235 cluster_id=Unicode('')
236 236
237 237 @property
238 238 def cluster_args(self):
239 239 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
240 240
241 241 class ControllerMixin(ClusterAppMixin):
242 242 controller_cmd = List(ipcontroller_cmd_argv, config=True,
243 243 help="""Popen command to launch ipcontroller.""")
244 244 # Command line arguments to ipcontroller.
245 245 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
246 246 help="""command-line args to pass to ipcontroller""")
247 247
248 248 class EngineMixin(ClusterAppMixin):
249 249 engine_cmd = List(ipengine_cmd_argv, config=True,
250 250 help="""command to launch the Engine.""")
251 251 # Command line arguments for ipengine.
252 252 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
253 253 help="command-line arguments to pass to ipengine"
254 254 )
255 255
256 256
257 257 #-----------------------------------------------------------------------------
258 258 # Local process launchers
259 259 #-----------------------------------------------------------------------------
260 260
261 261
262 262 class LocalProcessLauncher(BaseLauncher):
263 263 """Start and stop an external process in an asynchronous manner.
264 264
265 265 This will launch the external process with a working directory of
266 266 ``self.work_dir``.
267 267 """
268 268
269 269 # This is used to to construct self.args, which is passed to
270 270 # spawnProcess.
271 271 cmd_and_args = List([])
272 272 poll_frequency = Integer(100) # in ms
273 273
274 274 def __init__(self, work_dir=u'.', config=None, **kwargs):
275 275 super(LocalProcessLauncher, self).__init__(
276 276 work_dir=work_dir, config=config, **kwargs
277 277 )
278 278 self.process = None
279 279 self.poller = None
280 280
281 281 def find_args(self):
282 282 return self.cmd_and_args
283 283
284 284 def start(self):
285 285 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
286 286 if self.state == 'before':
287 287 self.process = Popen(self.args,
288 288 stdout=PIPE,stderr=PIPE,stdin=PIPE,
289 289 env=os.environ,
290 290 cwd=self.work_dir
291 291 )
292 292 if WINDOWS:
293 293 self.stdout = forward_read_events(self.process.stdout)
294 294 self.stderr = forward_read_events(self.process.stderr)
295 295 else:
296 296 self.stdout = self.process.stdout.fileno()
297 297 self.stderr = self.process.stderr.fileno()
298 298 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
299 299 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
300 300 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
301 301 self.poller.start()
302 302 self.notify_start(self.process.pid)
303 303 else:
304 304 s = 'The process was already started and has state: %r' % self.state
305 305 raise ProcessStateError(s)
306 306
307 307 def stop(self):
308 308 return self.interrupt_then_kill()
309 309
310 310 def signal(self, sig):
311 311 if self.state == 'running':
312 312 if WINDOWS and sig != SIGINT:
313 313 # use Windows tree-kill for better child cleanup
314 314 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
315 315 else:
316 316 self.process.send_signal(sig)
317 317
318 318 def interrupt_then_kill(self, delay=2.0):
319 319 """Send INT, wait a delay and then send KILL."""
320 320 try:
321 321 self.signal(SIGINT)
322 322 except Exception:
323 323 self.log.debug("interrupt failed")
324 324 pass
325 325 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
326 326 self.killer.start()
327 327
328 328 # callbacks, etc:
329 329
330 330 def handle_stdout(self, fd, events):
331 331 if WINDOWS:
332 332 line = self.stdout.recv()
333 333 else:
334 334 line = self.process.stdout.readline()
335 335 # a stopped process will be readable but return empty strings
336 336 if line:
337 337 self.log.debug(line[:-1])
338 338 else:
339 339 self.poll()
340 340
341 341 def handle_stderr(self, fd, events):
342 342 if WINDOWS:
343 343 line = self.stderr.recv()
344 344 else:
345 345 line = self.process.stderr.readline()
346 346 # a stopped process will be readable but return empty strings
347 347 if line:
348 348 self.log.debug(line[:-1])
349 349 else:
350 350 self.poll()
351 351
352 352 def poll(self):
353 353 status = self.process.poll()
354 354 if status is not None:
355 355 self.poller.stop()
356 356 self.loop.remove_handler(self.stdout)
357 357 self.loop.remove_handler(self.stderr)
358 358 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
359 359 return status
360 360
361 361 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
362 362 """Launch a controller as a regular external process."""
363 363
364 364 def find_args(self):
365 365 return self.controller_cmd + self.cluster_args + self.controller_args
366 366
367 367 def start(self):
368 368 """Start the controller by profile_dir."""
369 369 return super(LocalControllerLauncher, self).start()
370 370
371 371
372 372 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
373 373 """Launch a single engine as a regular externall process."""
374 374
375 375 def find_args(self):
376 376 return self.engine_cmd + self.cluster_args + self.engine_args
377 377
378 378
379 379 class LocalEngineSetLauncher(LocalEngineLauncher):
380 380 """Launch a set of engines as regular external processes."""
381 381
382 382 delay = CFloat(0.1, config=True,
383 383 help="""delay (in seconds) between starting each engine after the first.
384 384 This can help force the engines to get their ids in order, or limit
385 385 process flood when starting many engines."""
386 386 )
387 387
388 388 # launcher class
389 389 launcher_class = LocalEngineLauncher
390 390
391 391 launchers = Dict()
392 392 stop_data = Dict()
393 393
394 394 def __init__(self, work_dir=u'.', config=None, **kwargs):
395 395 super(LocalEngineSetLauncher, self).__init__(
396 396 work_dir=work_dir, config=config, **kwargs
397 397 )
398 398 self.stop_data = {}
399 399
400 400 def start(self, n):
401 401 """Start n engines by profile or profile_dir."""
402 402 dlist = []
403 403 for i in range(n):
404 404 if i > 0:
405 405 time.sleep(self.delay)
406 406 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
407 407 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
408 408 )
409 409
410 410 # Copy the engine args over to each engine launcher.
411 411 el.engine_cmd = copy.deepcopy(self.engine_cmd)
412 412 el.engine_args = copy.deepcopy(self.engine_args)
413 413 el.on_stop(self._notice_engine_stopped)
414 414 d = el.start()
415 415 self.launchers[i] = el
416 416 dlist.append(d)
417 417 self.notify_start(dlist)
418 418 return dlist
419 419
420 420 def find_args(self):
421 421 return ['engine set']
422 422
423 423 def signal(self, sig):
424 424 dlist = []
425 425 for el in self.launchers.itervalues():
426 426 d = el.signal(sig)
427 427 dlist.append(d)
428 428 return dlist
429 429
430 430 def interrupt_then_kill(self, delay=1.0):
431 431 dlist = []
432 432 for el in self.launchers.itervalues():
433 433 d = el.interrupt_then_kill(delay)
434 434 dlist.append(d)
435 435 return dlist
436 436
437 437 def stop(self):
438 438 return self.interrupt_then_kill()
439 439
440 440 def _notice_engine_stopped(self, data):
441 441 pid = data['pid']
442 442 for idx,el in self.launchers.iteritems():
443 443 if el.process.pid == pid:
444 444 break
445 445 self.launchers.pop(idx)
446 446 self.stop_data[idx] = data
447 447 if not self.launchers:
448 448 self.notify_stop(self.stop_data)
449 449
450 450
451 451 #-----------------------------------------------------------------------------
452 452 # MPI launchers
453 453 #-----------------------------------------------------------------------------
454 454
455 455
456 456 class MPILauncher(LocalProcessLauncher):
457 457 """Launch an external process using mpiexec."""
458 458
459 459 mpi_cmd = List(['mpiexec'], config=True,
460 460 help="The mpiexec command to use in starting the process."
461 461 )
462 462 mpi_args = List([], config=True,
463 463 help="The command line arguments to pass to mpiexec."
464 464 )
465 465 program = List(['date'],
466 466 help="The program to start via mpiexec.")
467 467 program_args = List([],
468 468 help="The command line argument to the program."
469 469 )
470 470 n = Integer(1)
471 471
472 472 def __init__(self, *args, **kwargs):
473 473 # deprecation for old MPIExec names:
474 474 config = kwargs.get('config', {})
475 475 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
476 476 deprecated = config.get(oldname)
477 477 if deprecated:
478 478 newname = oldname.replace('MPIExec', 'MPI')
479 479 config[newname].update(deprecated)
480 480 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
481 481
482 482 super(MPILauncher, self).__init__(*args, **kwargs)
483 483
484 484 def find_args(self):
485 485 """Build self.args using all the fields."""
486 486 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
487 487 self.program + self.program_args
488 488
489 489 def start(self, n):
490 490 """Start n instances of the program using mpiexec."""
491 491 self.n = n
492 492 return super(MPILauncher, self).start()
493 493
494 494
495 495 class MPIControllerLauncher(MPILauncher, ControllerMixin):
496 496 """Launch a controller using mpiexec."""
497 497
498 498 # alias back to *non-configurable* program[_args] for use in find_args()
499 499 # this way all Controller/EngineSetLaunchers have the same form, rather
500 500 # than *some* having `program_args` and others `controller_args`
501 501 @property
502 502 def program(self):
503 503 return self.controller_cmd
504 504
505 505 @property
506 506 def program_args(self):
507 507 return self.cluster_args + self.controller_args
508 508
509 509 def start(self):
510 510 """Start the controller by profile_dir."""
511 511 return super(MPIControllerLauncher, self).start(1)
512 512
513 513
514 514 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
515 515 """Launch engines using mpiexec"""
516 516
517 517 # alias back to *non-configurable* program[_args] for use in find_args()
518 518 # this way all Controller/EngineSetLaunchers have the same form, rather
519 519 # than *some* having `program_args` and others `controller_args`
520 520 @property
521 521 def program(self):
522 522 return self.engine_cmd
523 523
524 524 @property
525 525 def program_args(self):
526 526 return self.cluster_args + self.engine_args
527 527
528 528 def start(self, n):
529 529 """Start n engines by profile or profile_dir."""
530 530 self.n = n
531 531 return super(MPIEngineSetLauncher, self).start(n)
532 532
533 533 # deprecated MPIExec names
534 534 class DeprecatedMPILauncher(object):
535 535 def warn(self):
536 536 oldname = self.__class__.__name__
537 537 newname = oldname.replace('MPIExec', 'MPI')
538 538 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
539 539
540 540 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
541 541 """Deprecated, use MPILauncher"""
542 542 def __init__(self, *args, **kwargs):
543 543 super(MPIExecLauncher, self).__init__(*args, **kwargs)
544 544 self.warn()
545 545
546 546 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
547 547 """Deprecated, use MPIControllerLauncher"""
548 548 def __init__(self, *args, **kwargs):
549 549 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
550 550 self.warn()
551 551
552 552 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
553 553 """Deprecated, use MPIEngineSetLauncher"""
554 554 def __init__(self, *args, **kwargs):
555 555 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
556 556 self.warn()
557 557
558 558
559 559 #-----------------------------------------------------------------------------
560 560 # SSH launchers
561 561 #-----------------------------------------------------------------------------
562 562
563 563 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
564 564
565 565 class SSHLauncher(LocalProcessLauncher):
566 566 """A minimal launcher for ssh.
567 567
568 568 To be useful this will probably have to be extended to use the ``sshx``
569 569 idea for environment variables. There could be other things this needs
570 570 as well.
571 571 """
572 572
573 573 ssh_cmd = List(['ssh'], config=True,
574 574 help="command for starting ssh")
575 575 ssh_args = List(['-tt'], config=True,
576 576 help="args to pass to ssh")
577 577 scp_cmd = List(['scp'], config=True,
578 578 help="command for sending files")
579 579 program = List(['date'],
580 580 help="Program to launch via ssh")
581 581 program_args = List([],
582 582 help="args to pass to remote program")
583 583 hostname = Unicode('', config=True,
584 584 help="hostname on which to launch the program")
585 585 user = Unicode('', config=True,
586 586 help="username for ssh")
587 587 location = Unicode('', config=True,
588 588 help="user@hostname location for ssh in one setting")
589 589 to_fetch = List([], config=True,
590 590 help="List of (remote, local) files to fetch after starting")
591 591 to_send = List([], config=True,
592 592 help="List of (local, remote) files to send before starting")
593 593
594 594 def _hostname_changed(self, name, old, new):
595 595 if self.user:
596 596 self.location = u'%s@%s' % (self.user, new)
597 597 else:
598 598 self.location = new
599 599
600 600 def _user_changed(self, name, old, new):
601 601 self.location = u'%s@%s' % (new, self.hostname)
602 602
603 603 def find_args(self):
604 604 return self.ssh_cmd + self.ssh_args + [self.location] + \
605 605 list(map(pipes.quote, self.program + self.program_args))
606 606
607 607 def _send_file(self, local, remote):
608 608 """send a single file"""
609 609 remote = "%s:%s" % (self.location, remote)
610 610 for i in range(10):
611 611 if not os.path.exists(local):
612 612 self.log.debug("waiting for %s" % local)
613 613 time.sleep(1)
614 614 else:
615 615 break
616 616 self.log.info("sending %s to %s", local, remote)
617 617 check_output(self.scp_cmd + [local, remote])
618 618
619 619 def send_files(self):
620 620 """send our files (called before start)"""
621 621 if not self.to_send:
622 622 return
623 623 for local_file, remote_file in self.to_send:
624 624 self._send_file(local_file, remote_file)
625 625
626 626 def _fetch_file(self, remote, local):
627 627 """fetch a single file"""
628 628 full_remote = "%s:%s" % (self.location, remote)
629 629 self.log.info("fetching %s from %s", local, full_remote)
630 630 for i in range(10):
631 631 # wait up to 10s for remote file to exist
632 632 check = check_output(self.ssh_cmd + self.ssh_args + \
633 633 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
634 634 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
635 635 if check == u'no':
636 636 time.sleep(1)
637 637 elif check == u'yes':
638 638 break
639 639 check_output(self.scp_cmd + [full_remote, local])
640 640
641 641 def fetch_files(self):
642 642 """fetch remote files (called after start)"""
643 643 if not self.to_fetch:
644 644 return
645 645 for remote_file, local_file in self.to_fetch:
646 646 self._fetch_file(remote_file, local_file)
647 647
648 648 def start(self, hostname=None, user=None):
649 649 if hostname is not None:
650 650 self.hostname = hostname
651 651 if user is not None:
652 652 self.user = user
653 653
654 654 self.send_files()
655 655 super(SSHLauncher, self).start()
656 656 self.fetch_files()
657 657
658 658 def signal(self, sig):
659 659 if self.state == 'running':
660 660 # send escaped ssh connection-closer
661 661 self.process.stdin.write('~.')
662 662 self.process.stdin.flush()
663 663
664 664 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
665 665
666 666 remote_profile_dir = Unicode('', config=True,
667 667 help="""The remote profile_dir to use.
668 668
669 669 If not specified, use calling profile, stripping out possible leading homedir.
670 670 """)
671 671
672 672 def _profile_dir_changed(self, name, old, new):
673 673 if not self.remote_profile_dir:
674 674 # trigger remote_profile_dir_default logic again,
675 675 # in case it was already triggered before profile_dir was set
676 676 self.remote_profile_dir = self._strip_home(new)
677 677
678 678 @staticmethod
679 679 def _strip_home(path):
680 680 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
681 681 home = get_home_dir()
682 682 if not home.endswith('/'):
683 683 home = home+'/'
684 684
685 685 if path.startswith(home):
686 686 return path[len(home):]
687 687 else:
688 688 return path
689 689
690 690 def _remote_profile_dir_default(self):
691 691 return self._strip_home(self.profile_dir)
692 692
693 693 def _cluster_id_changed(self, name, old, new):
694 694 if new:
695 695 raise ValueError("cluster id not supported by SSH launchers")
696 696
697 697 @property
698 698 def cluster_args(self):
699 699 return ['--profile-dir', self.remote_profile_dir]
700 700
701 701 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
702 702
703 703 # alias back to *non-configurable* program[_args] for use in find_args()
704 704 # this way all Controller/EngineSetLaunchers have the same form, rather
705 705 # than *some* having `program_args` and others `controller_args`
706 706
707 707 def _controller_cmd_default(self):
708 708 return ['ipcontroller']
709 709
710 710 @property
711 711 def program(self):
712 712 return self.controller_cmd
713 713
714 714 @property
715 715 def program_args(self):
716 716 return self.cluster_args + self.controller_args
717 717
718 718 def _to_fetch_default(self):
719 719 return [
720 720 (os.path.join(self.remote_profile_dir, 'security', cf),
721 721 os.path.join(self.profile_dir, 'security', cf),)
722 722 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
723 723 ]
724 724
725 725 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
726 726
727 727 # alias back to *non-configurable* program[_args] for use in find_args()
728 728 # this way all Controller/EngineSetLaunchers have the same form, rather
729 729 # than *some* having `program_args` and others `controller_args`
730 730
731 731 def _engine_cmd_default(self):
732 732 return ['ipengine']
733 733
734 734 @property
735 735 def program(self):
736 736 return self.engine_cmd
737 737
738 738 @property
739 739 def program_args(self):
740 740 return self.cluster_args + self.engine_args
741 741
742 742 def _to_send_default(self):
743 743 return [
744 744 (os.path.join(self.profile_dir, 'security', cf),
745 745 os.path.join(self.remote_profile_dir, 'security', cf))
746 746 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
747 747 ]
748 748
749 749
750 750 class SSHEngineSetLauncher(LocalEngineSetLauncher):
751 751 launcher_class = SSHEngineLauncher
752 752 engines = Dict(config=True,
753 753 help="""dict of engines to launch. This is a dict by hostname of ints,
754 754 corresponding to the number of engines to start on that host.""")
755 755
756 756 def _engine_cmd_default(self):
757 757 return ['ipengine']
758 758
759 759 @property
760 760 def engine_count(self):
761 761 """determine engine count from `engines` dict"""
762 762 count = 0
763 763 for n in self.engines.itervalues():
764 764 if isinstance(n, (tuple,list)):
765 765 n,args = n
766 766 count += n
767 767 return count
768 768
769 769 def start(self, n):
770 770 """Start engines by profile or profile_dir.
771 771 `n` is ignored, and the `engines` config property is used instead.
772 772 """
773 773
774 774 dlist = []
775 775 for host, n in self.engines.iteritems():
776 776 if isinstance(n, (tuple, list)):
777 777 n, args = n
778 778 else:
779 779 args = copy.deepcopy(self.engine_args)
780 780
781 781 if '@' in host:
782 782 user,host = host.split('@',1)
783 783 else:
784 784 user=None
785 785 for i in range(n):
786 786 if i > 0:
787 787 time.sleep(self.delay)
788 788 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
789 789 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
790 790 )
791 791 if i > 0:
792 792 # only send files for the first engine on each host
793 793 el.to_send = []
794 794
795 795 # Copy the engine args over to each engine launcher.
796 796 el.engine_cmd = self.engine_cmd
797 797 el.engine_args = args
798 798 el.on_stop(self._notice_engine_stopped)
799 799 d = el.start(user=user, hostname=host)
800 800 self.launchers[ "%s/%i" % (host,i) ] = el
801 801 dlist.append(d)
802 802 self.notify_start(dlist)
803 803 return dlist
804 804
805 805
806 806 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
807 807 """Launcher for calling
808 808 `ipcluster engines` on a remote machine.
809 809
810 810 Requires that remote profile is already configured.
811 811 """
812 812
813 813 n = Integer()
814 814 ipcluster_cmd = List(['ipcluster'], config=True)
815 815
816 816 @property
817 817 def program(self):
818 818 return self.ipcluster_cmd + ['engines']
819 819
820 820 @property
821 821 def program_args(self):
822 822 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
823 823
824 824 def _to_send_default(self):
825 825 return [
826 826 (os.path.join(self.profile_dir, 'security', cf),
827 827 os.path.join(self.remote_profile_dir, 'security', cf))
828 828 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
829 829 ]
830 830
831 831 def start(self, n):
832 832 self.n = n
833 833 super(SSHProxyEngineSetLauncher, self).start()
834 834
835 835
836 836 #-----------------------------------------------------------------------------
837 837 # Windows HPC Server 2008 scheduler launchers
838 838 #-----------------------------------------------------------------------------
839 839
840 840
841 841 # This is only used on Windows.
842 842 def find_job_cmd():
843 843 if WINDOWS:
844 844 try:
845 845 return find_cmd('job')
846 846 except (FindCmdError, ImportError):
847 847 # ImportError will be raised if win32api is not installed
848 848 return 'job'
849 849 else:
850 850 return 'job'
851 851
852 852
853 853 class WindowsHPCLauncher(BaseLauncher):
854 854
855 855 job_id_regexp = CRegExp(r'\d+', config=True,
856 856 help="""A regular expression used to get the job id from the output of the
857 857 submit_command. """
858 858 )
859 859 job_file_name = Unicode(u'ipython_job.xml', config=True,
860 860 help="The filename of the instantiated job script.")
861 861 # The full path to the instantiated job script. This gets made dynamically
862 862 # by combining the work_dir with the job_file_name.
863 863 job_file = Unicode(u'')
864 864 scheduler = Unicode('', config=True,
865 865 help="The hostname of the scheduler to submit the job to.")
866 866 job_cmd = Unicode(find_job_cmd(), config=True,
867 867 help="The command for submitting jobs.")
868 868
869 869 def __init__(self, work_dir=u'.', config=None, **kwargs):
870 870 super(WindowsHPCLauncher, self).__init__(
871 871 work_dir=work_dir, config=config, **kwargs
872 872 )
873 873
874 874 @property
875 875 def job_file(self):
876 876 return os.path.join(self.work_dir, self.job_file_name)
877 877
878 878 def write_job_file(self, n):
879 879 raise NotImplementedError("Implement write_job_file in a subclass.")
880 880
881 881 def find_args(self):
882 882 return [u'job.exe']
883 883
884 884 def parse_job_id(self, output):
885 885 """Take the output of the submit command and return the job id."""
886 886 m = self.job_id_regexp.search(output)
887 887 if m is not None:
888 888 job_id = m.group()
889 889 else:
890 890 raise LauncherError("Job id couldn't be determined: %s" % output)
891 891 self.job_id = job_id
892 892 self.log.info('Job started with id: %r', job_id)
893 893 return job_id
894 894
895 895 def start(self, n):
896 896 """Start n copies of the process using the Win HPC job scheduler."""
897 897 self.write_job_file(n)
898 898 args = [
899 899 'submit',
900 900 '/jobfile:%s' % self.job_file,
901 901 '/scheduler:%s' % self.scheduler
902 902 ]
903 903 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
904 904
905 905 output = check_output([self.job_cmd]+args,
906 906 env=os.environ,
907 907 cwd=self.work_dir,
908 908 stderr=STDOUT
909 909 )
910 910 output = output.decode(DEFAULT_ENCODING, 'replace')
911 911 job_id = self.parse_job_id(output)
912 912 self.notify_start(job_id)
913 913 return job_id
914 914
915 915 def stop(self):
916 916 args = [
917 917 'cancel',
918 918 self.job_id,
919 919 '/scheduler:%s' % self.scheduler
920 920 ]
921 921 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
922 922 try:
923 923 output = check_output([self.job_cmd]+args,
924 924 env=os.environ,
925 925 cwd=self.work_dir,
926 926 stderr=STDOUT
927 927 )
928 928 output = output.decode(DEFAULT_ENCODING, 'replace')
929 929 except:
930 930 output = u'The job already appears to be stopped: %r' % self.job_id
931 931 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
932 932 return output
933 933
934 934
935 935 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
936 936
937 937 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
938 938 help="WinHPC xml job file.")
939 939 controller_args = List([], config=False,
940 940 help="extra args to pass to ipcontroller")
941 941
942 942 def write_job_file(self, n):
943 943 job = IPControllerJob(config=self.config)
944 944
945 945 t = IPControllerTask(config=self.config)
946 946 # The tasks work directory is *not* the actual work directory of
947 947 # the controller. It is used as the base path for the stdout/stderr
948 948 # files that the scheduler redirects to.
949 949 t.work_directory = self.profile_dir
950 950 # Add the profile_dir and from self.start().
951 951 t.controller_args.extend(self.cluster_args)
952 952 t.controller_args.extend(self.controller_args)
953 953 job.add_task(t)
954 954
955 955 self.log.debug("Writing job description file: %s", self.job_file)
956 956 job.write(self.job_file)
957 957
958 958 @property
959 959 def job_file(self):
960 960 return os.path.join(self.profile_dir, self.job_file_name)
961 961
962 962 def start(self):
963 963 """Start the controller by profile_dir."""
964 964 return super(WindowsHPCControllerLauncher, self).start(1)
965 965
966 966
967 967 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
968 968
969 969 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
970 970 help="jobfile for ipengines job")
971 971 engine_args = List([], config=False,
972 972 help="extra args to pas to ipengine")
973 973
974 974 def write_job_file(self, n):
975 975 job = IPEngineSetJob(config=self.config)
976 976
977 977 for i in range(n):
978 978 t = IPEngineTask(config=self.config)
979 979 # The tasks work directory is *not* the actual work directory of
980 980 # the engine. It is used as the base path for the stdout/stderr
981 981 # files that the scheduler redirects to.
982 982 t.work_directory = self.profile_dir
983 983 # Add the profile_dir and from self.start().
984 984 t.engine_args.extend(self.cluster_args)
985 985 t.engine_args.extend(self.engine_args)
986 986 job.add_task(t)
987 987
988 988 self.log.debug("Writing job description file: %s", self.job_file)
989 989 job.write(self.job_file)
990 990
991 991 @property
992 992 def job_file(self):
993 993 return os.path.join(self.profile_dir, self.job_file_name)
994 994
995 995 def start(self, n):
996 996 """Start the controller by profile_dir."""
997 997 return super(WindowsHPCEngineSetLauncher, self).start(n)
998 998
999 999
1000 1000 #-----------------------------------------------------------------------------
1001 1001 # Batch (PBS) system launchers
1002 1002 #-----------------------------------------------------------------------------
1003 1003
1004 1004 class BatchClusterAppMixin(ClusterAppMixin):
1005 1005 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1006 1006 def _profile_dir_changed(self, name, old, new):
1007 1007 self.context[name] = new
1008 1008 _cluster_id_changed = _profile_dir_changed
1009 1009
1010 1010 def _profile_dir_default(self):
1011 1011 self.context['profile_dir'] = ''
1012 1012 return ''
1013 1013 def _cluster_id_default(self):
1014 1014 self.context['cluster_id'] = ''
1015 1015 return ''
1016 1016
1017 1017
1018 1018 class BatchSystemLauncher(BaseLauncher):
1019 1019 """Launch an external process using a batch system.
1020 1020
1021 1021 This class is designed to work with UNIX batch systems like PBS, LSF,
1022 1022 GridEngine, etc. The overall model is that there are different commands
1023 1023 like qsub, qdel, etc. that handle the starting and stopping of the process.
1024 1024
1025 1025 This class also has the notion of a batch script. The ``batch_template``
1026 1026 attribute can be set to a string that is a template for the batch script.
1027 1027 This template is instantiated using string formatting. Thus the template can
1028 1028 use {n} fot the number of instances. Subclasses can add additional variables
1029 1029 to the template dict.
1030 1030 """
1031 1031
1032 1032 # Subclasses must fill these in. See PBSEngineSet
1033 1033 submit_command = List([''], config=True,
1034 1034 help="The name of the command line program used to submit jobs.")
1035 1035 delete_command = List([''], config=True,
1036 1036 help="The name of the command line program used to delete jobs.")
1037 1037 job_id_regexp = CRegExp('', config=True,
1038 1038 help="""A regular expression used to get the job id from the output of the
1039 1039 submit_command.""")
1040 1040 job_id_regexp_group = Integer(0, config=True,
1041 1041 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1042 1042 batch_template = Unicode('', config=True,
1043 1043 help="The string that is the batch script template itself.")
1044 1044 batch_template_file = Unicode(u'', config=True,
1045 1045 help="The file that contains the batch template.")
1046 1046 batch_file_name = Unicode(u'batch_script', config=True,
1047 1047 help="The filename of the instantiated batch script.")
1048 1048 queue = Unicode(u'', config=True,
1049 1049 help="The PBS Queue.")
1050 1050
1051 1051 def _queue_changed(self, name, old, new):
1052 1052 self.context[name] = new
1053 1053
1054 1054 n = Integer(1)
1055 1055 _n_changed = _queue_changed
1056 1056
1057 1057 # not configurable, override in subclasses
1058 1058 # PBS Job Array regex
1059 1059 job_array_regexp = CRegExp('')
1060 1060 job_array_template = Unicode('')
1061 1061 # PBS Queue regex
1062 1062 queue_regexp = CRegExp('')
1063 1063 queue_template = Unicode('')
1064 1064 # The default batch template, override in subclasses
1065 1065 default_template = Unicode('')
1066 1066 # The full path to the instantiated batch script.
1067 1067 batch_file = Unicode(u'')
1068 1068 # the format dict used with batch_template:
1069 1069 context = Dict()
1070 1070
1071 1071 def _context_default(self):
1072 1072 """load the default context with the default values for the basic keys
1073 1073
1074 1074 because the _trait_changed methods only load the context if they
1075 1075 are set to something other than the default value.
1076 1076 """
1077 1077 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1078 1078
1079 1079 # the Formatter instance for rendering the templates:
1080 1080 formatter = Instance(EvalFormatter, (), {})
1081 1081
1082 1082 def find_args(self):
1083 1083 return self.submit_command + [self.batch_file]
1084 1084
1085 1085 def __init__(self, work_dir=u'.', config=None, **kwargs):
1086 1086 super(BatchSystemLauncher, self).__init__(
1087 1087 work_dir=work_dir, config=config, **kwargs
1088 1088 )
1089 1089 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1090 1090
1091 1091 def parse_job_id(self, output):
1092 1092 """Take the output of the submit command and return the job id."""
1093 1093 m = self.job_id_regexp.search(output)
1094 1094 if m is not None:
1095 1095 job_id = m.group(self.job_id_regexp_group)
1096 1096 else:
1097 1097 raise LauncherError("Job id couldn't be determined: %s" % output)
1098 1098 self.job_id = job_id
1099 1099 self.log.info('Job submitted with job id: %r', job_id)
1100 1100 return job_id
1101 1101
1102 1102 def write_batch_script(self, n):
1103 1103 """Instantiate and write the batch script to the work_dir."""
1104 1104 self.n = n
1105 1105 # first priority is batch_template if set
1106 1106 if self.batch_template_file and not self.batch_template:
1107 1107 # second priority is batch_template_file
1108 1108 with open(self.batch_template_file) as f:
1109 1109 self.batch_template = f.read()
1110 1110 if not self.batch_template:
1111 1111 # third (last) priority is default_template
1112 1112 self.batch_template = self.default_template
1113 1113 # add jobarray or queue lines to user-specified template
1114 1114 # note that this is *only* when user did not specify a template.
1115 1115 self._insert_queue_in_script()
1116 1116 self._insert_job_array_in_script()
1117 1117 script_as_string = self.formatter.format(self.batch_template, **self.context)
1118 1118 self.log.debug('Writing batch script: %s', self.batch_file)
1119 1119 with open(self.batch_file, 'w') as f:
1120 1120 f.write(script_as_string)
1121 1121 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1122 1122
1123 1123 def _insert_queue_in_script(self):
1124 1124 """Inserts a queue if required into the batch script.
1125 1125 """
1126 print self.queue_regexp.search(self.batch_template)
1127 1126 if self.queue and not self.queue_regexp.search(self.batch_template):
1128 1127 self.log.debug("adding PBS queue settings to batch script")
1129 1128 firstline, rest = self.batch_template.split('\n',1)
1130 1129 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1131 1130
1132 1131 def _insert_job_array_in_script(self):
1133 1132 """Inserts a job array if required into the batch script.
1134 1133 """
1135 print self.job_array_regexp.search(self.batch_template)
1136 1134 if not self.job_array_regexp.search(self.batch_template):
1137 1135 self.log.debug("adding job array settings to batch script")
1138 1136 firstline, rest = self.batch_template.split('\n',1)
1139 1137 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1140 1138
1141 1139 def start(self, n):
1142 1140 """Start n copies of the process using a batch system."""
1143 1141 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1144 1142 # Here we save profile_dir in the context so they
1145 1143 # can be used in the batch script template as {profile_dir}
1146 1144 self.write_batch_script(n)
1147 1145 output = check_output(self.args, env=os.environ)
1148 1146 output = output.decode(DEFAULT_ENCODING, 'replace')
1149 1147
1150 1148 job_id = self.parse_job_id(output)
1151 1149 self.notify_start(job_id)
1152 1150 return job_id
1153 1151
1154 1152 def stop(self):
1155 1153 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1156 1154 output = output.decode(DEFAULT_ENCODING, 'replace')
1157 1155 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1158 1156 return output
1159 1157
1160 1158
1161 1159 class PBSLauncher(BatchSystemLauncher):
1162 1160 """A BatchSystemLauncher subclass for PBS."""
1163 1161
1164 1162 submit_command = List(['qsub'], config=True,
1165 1163 help="The PBS submit command ['qsub']")
1166 1164 delete_command = List(['qdel'], config=True,
1167 1165 help="The PBS delete command ['qsub']")
1168 1166 job_id_regexp = CRegExp(r'\d+', config=True,
1169 1167 help="Regular expresion for identifying the job ID [r'\d+']")
1170 1168
1171 1169 batch_file = Unicode(u'')
1172 1170 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1173 1171 job_array_template = Unicode('#PBS -t 1-{n}')
1174 1172 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1175 1173 queue_template = Unicode('#PBS -q {queue}')
1176 1174
1177 1175
1178 1176 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1179 1177 """Launch a controller using PBS."""
1180 1178
1181 1179 batch_file_name = Unicode(u'pbs_controller', config=True,
1182 1180 help="batch file name for the controller job.")
1183 1181 default_template= Unicode("""#!/bin/sh
1184 1182 #PBS -V
1185 1183 #PBS -N ipcontroller
1186 1184 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1187 1185 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1188 1186
1189 1187
1190 1188 def start(self):
1191 1189 """Start the controller by profile or profile_dir."""
1192 1190 return super(PBSControllerLauncher, self).start(1)
1193 1191
1194 1192
1195 1193 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1196 1194 """Launch Engines using PBS"""
1197 1195 batch_file_name = Unicode(u'pbs_engines', config=True,
1198 1196 help="batch file name for the engine(s) job.")
1199 1197 default_template= Unicode(u"""#!/bin/sh
1200 1198 #PBS -V
1201 1199 #PBS -N ipengine
1202 1200 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1203 1201 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1204 1202
1205 1203 def start(self, n):
1206 1204 """Start n engines by profile or profile_dir."""
1207 1205 return super(PBSEngineSetLauncher, self).start(n)
1208 1206
1209 1207
1210 1208 #SGE is very similar to PBS
1211 1209
1212 1210 class SGELauncher(PBSLauncher):
1213 1211 """Sun GridEngine is a PBS clone with slightly different syntax"""
1214 1212 job_array_regexp = CRegExp('#\$\W+\-t')
1215 1213 job_array_template = Unicode('#$ -t 1-{n}')
1216 1214 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1217 1215 queue_template = Unicode('#$ -q {queue}')
1218 1216
1219 1217
1220 1218 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1221 1219 """Launch a controller using SGE."""
1222 1220
1223 1221 batch_file_name = Unicode(u'sge_controller', config=True,
1224 1222 help="batch file name for the ipontroller job.")
1225 1223 default_template= Unicode(u"""#$ -V
1226 1224 #$ -S /bin/sh
1227 1225 #$ -N ipcontroller
1228 1226 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1229 1227 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1230 1228
1231 1229 def start(self):
1232 1230 """Start the controller by profile or profile_dir."""
1233 1231 return super(SGEControllerLauncher, self).start(1)
1234 1232
1235 1233
1236 1234 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1237 1235 """Launch Engines with SGE"""
1238 1236 batch_file_name = Unicode(u'sge_engines', config=True,
1239 1237 help="batch file name for the engine(s) job.")
1240 1238 default_template = Unicode("""#$ -V
1241 1239 #$ -S /bin/sh
1242 1240 #$ -N ipengine
1243 1241 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1244 1242 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1245 1243
1246 1244 def start(self, n):
1247 1245 """Start n engines by profile or profile_dir."""
1248 1246 return super(SGEEngineSetLauncher, self).start(n)
1249 1247
1250 1248
1251 1249 # LSF launchers
1252 1250
1253 1251 class LSFLauncher(BatchSystemLauncher):
1254 1252 """A BatchSystemLauncher subclass for LSF."""
1255 1253
1256 1254 submit_command = List(['bsub'], config=True,
1257 1255 help="The PBS submit command ['bsub']")
1258 1256 delete_command = List(['bkill'], config=True,
1259 1257 help="The PBS delete command ['bkill']")
1260 1258 job_id_regexp = CRegExp(r'\d+', config=True,
1261 1259 help="Regular expresion for identifying the job ID [r'\d+']")
1262 1260
1263 1261 batch_file = Unicode(u'')
1264 1262 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1265 1263 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1266 1264 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1267 1265 queue_template = Unicode('#BSUB -q {queue}')
1268 1266
1269 1267 def start(self, n):
1270 1268 """Start n copies of the process using LSF batch system.
1271 1269 This cant inherit from the base class because bsub expects
1272 1270 to be piped a shell script in order to honor the #BSUB directives :
1273 1271 bsub < script
1274 1272 """
1275 1273 # Here we save profile_dir in the context so they
1276 1274 # can be used in the batch script template as {profile_dir}
1277 1275 self.write_batch_script(n)
1278 1276 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1279 1277 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1280 1278 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1281 1279 output,err = p.communicate()
1282 1280 output = output.decode(DEFAULT_ENCODING, 'replace')
1283 1281 job_id = self.parse_job_id(output)
1284 1282 self.notify_start(job_id)
1285 1283 return job_id
1286 1284
1287 1285
1288 1286 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1289 1287 """Launch a controller using LSF."""
1290 1288
1291 1289 batch_file_name = Unicode(u'lsf_controller', config=True,
1292 1290 help="batch file name for the controller job.")
1293 1291 default_template= Unicode("""#!/bin/sh
1294 1292 #BSUB -J ipcontroller
1295 1293 #BSUB -oo ipcontroller.o.%%J
1296 1294 #BSUB -eo ipcontroller.e.%%J
1297 1295 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1298 1296 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1299 1297
1300 1298 def start(self):
1301 1299 """Start the controller by profile or profile_dir."""
1302 1300 return super(LSFControllerLauncher, self).start(1)
1303 1301
1304 1302
1305 1303 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1306 1304 """Launch Engines using LSF"""
1307 1305 batch_file_name = Unicode(u'lsf_engines', config=True,
1308 1306 help="batch file name for the engine(s) job.")
1309 1307 default_template= Unicode(u"""#!/bin/sh
1310 1308 #BSUB -oo ipengine.o.%%J
1311 1309 #BSUB -eo ipengine.e.%%J
1312 1310 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1313 1311 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1314 1312
1315 1313 def start(self, n):
1316 1314 """Start n engines by profile or profile_dir."""
1317 1315 return super(LSFEngineSetLauncher, self).start(n)
1318 1316
1319 1317
1320 1318 # Condor Requires that we launch the ipengine/ipcontroller scripts rather
1321 1319 # that the python instance but otherwise is very similar to PBS
1322 1320
1323 1321 class CondorLauncher(BatchSystemLauncher):
1324 1322 """A BatchSystemLauncher subclass for Condor."""
1325 1323
1326 1324 submit_command = List(['condor_submit'], config=True,
1327 1325 help="The Condor submit command ['condor_submit']")
1328 1326 delete_command = List(['condor_rm'], config=True,
1329 1327 help="The Condor delete command ['condor_rm']")
1330 1328 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1331 1329 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1332 1330 job_id_regexp_group = Integer(1, config=True,
1333 1331 help="""The group we wish to match in job_id_regexp [1]""")
1334 1332
1335 1333 job_array_regexp = CRegExp('queue\W+\$')
1336 1334 job_array_template = Unicode('queue {n}')
1337 1335
1338 1336
1339 1337 def _insert_job_array_in_script(self):
1340 1338 """Inserts a job array if required into the batch script.
1341 1339 """
1342 1340 if not self.job_array_regexp.search(self.batch_template):
1343 1341 self.log.debug("adding job array settings to batch script")
1344 1342 #Condor requires that the job array goes at the bottom of the script
1345 1343 self.batch_template = '\n'.join([self.batch_template,
1346 1344 self.job_array_template])
1347 1345
1348 1346 def _insert_queue_in_script(self):
1349 1347 """AFAIK, Condor doesn't have a concept of multiple queues that can be
1350 1348 specified in the script..
1351 1349 """
1352 1350 pass
1353 1351
1354 1352
1355 1353 class CondorControllerLauncher(CondorLauncher, BatchClusterAppMixin):
1356 1354 """Launch a controller using Condor."""
1357 1355
1358 1356 batch_file_name = Unicode(u'condor_controller', config=True,
1359 1357 help="batch file name for the controller job.")
1360 1358 default_template = Unicode(r"""
1361 1359 universe = vanilla
1362 1360 executable = %s
1363 1361 # by default we expect a shared file system
1364 1362 transfer_executable = False
1365 1363 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1366 1364 """ % condor_ipcontroller_cmd_argv)
1367 1365
1368 1366 def start(self):
1369 1367 """Start the controller by profile or profile_dir."""
1370 1368 return super(CondorControllerLauncher, self).start(1)
1371 1369
1372 1370
1373 1371 class CondorEngineSetLauncher(CondorLauncher, BatchClusterAppMixin):
1374 1372 """Launch Engines using Condor"""
1375 1373 batch_file_name = Unicode(u'condor_engines', config=True,
1376 1374 help="batch file name for the engine(s) job.")
1377 1375 default_template = Unicode("""
1378 1376 universe = vanilla
1379 1377 executable = %s
1380 1378 # by default we expect a shared file system
1381 1379 transfer_executable = False
1382 1380 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1383 1381 """ % condor_ipengine_cmd_argv)
1384 1382
1385 1383 def start(self, n):
1386 1384 """Start n engines by profile or profile_dir."""
1387 1385 return super(CondorEngineSetLauncher, self).start(n)
1388 1386
1389 1387
1390 1388 #-----------------------------------------------------------------------------
1391 1389 # A launcher for ipcluster itself!
1392 1390 #-----------------------------------------------------------------------------
1393 1391
1394 1392
1395 1393 class IPClusterLauncher(LocalProcessLauncher):
1396 1394 """Launch the ipcluster program in an external process."""
1397 1395
1398 1396 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1399 1397 help="Popen command for ipcluster")
1400 1398 ipcluster_args = List(
1401 1399 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1402 1400 help="Command line arguments to pass to ipcluster.")
1403 1401 ipcluster_subcommand = Unicode('start')
1404 1402 profile = Unicode('default')
1405 1403 n = Integer(2)
1406 1404
1407 1405 def find_args(self):
1408 1406 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1409 1407 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1410 1408 self.ipcluster_args
1411 1409
1412 1410 def start(self):
1413 1411 return super(IPClusterLauncher, self).start()
1414 1412
1415 1413 #-----------------------------------------------------------------------------
1416 1414 # Collections of launchers
1417 1415 #-----------------------------------------------------------------------------
1418 1416
1419 1417 local_launchers = [
1420 1418 LocalControllerLauncher,
1421 1419 LocalEngineLauncher,
1422 1420 LocalEngineSetLauncher,
1423 1421 ]
1424 1422 mpi_launchers = [
1425 1423 MPILauncher,
1426 1424 MPIControllerLauncher,
1427 1425 MPIEngineSetLauncher,
1428 1426 ]
1429 1427 ssh_launchers = [
1430 1428 SSHLauncher,
1431 1429 SSHControllerLauncher,
1432 1430 SSHEngineLauncher,
1433 1431 SSHEngineSetLauncher,
1434 1432 SSHProxyEngineSetLauncher,
1435 1433 ]
1436 1434 winhpc_launchers = [
1437 1435 WindowsHPCLauncher,
1438 1436 WindowsHPCControllerLauncher,
1439 1437 WindowsHPCEngineSetLauncher,
1440 1438 ]
1441 1439 pbs_launchers = [
1442 1440 PBSLauncher,
1443 1441 PBSControllerLauncher,
1444 1442 PBSEngineSetLauncher,
1445 1443 ]
1446 1444 sge_launchers = [
1447 1445 SGELauncher,
1448 1446 SGEControllerLauncher,
1449 1447 SGEEngineSetLauncher,
1450 1448 ]
1451 1449 lsf_launchers = [
1452 1450 LSFLauncher,
1453 1451 LSFControllerLauncher,
1454 1452 LSFEngineSetLauncher,
1455 1453 ]
1456 1454 condor_launchers = [
1457 1455 CondorLauncher,
1458 1456 CondorControllerLauncher,
1459 1457 CondorEngineSetLauncher,
1460 1458 ]
1461 1459 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1462 1460 + pbs_launchers + sge_launchers + lsf_launchers + condor_launchers
General Comments 0
You need to be logged in to leave comments. Login now