##// END OF EJS Templates
use `-m` entry points in parallel launchers
MinRK -
Show More
@@ -1,1451 +1,1449 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import pipes
26 26 import stat
27 27 import sys
28 28 import time
29 29
30 30 # signal imports, handling various platforms, versions
31 31
32 32 from signal import SIGINT, SIGTERM
33 33 try:
34 34 from signal import SIGKILL
35 35 except ImportError:
36 36 # Windows
37 37 SIGKILL=SIGTERM
38 38
39 39 try:
40 40 # Windows >= 2.7, 3.2
41 41 from signal import CTRL_C_EVENT as SIGINT
42 42 except ImportError:
43 43 pass
44 44
45 45 from subprocess import Popen, PIPE, STDOUT
46 46 try:
47 47 from subprocess import check_output
48 48 except ImportError:
49 49 # pre-2.7, define check_output with Popen
50 50 def check_output(*args, **kwargs):
51 51 kwargs.update(dict(stdout=PIPE))
52 52 p = Popen(*args, **kwargs)
53 53 out,err = p.communicate()
54 54 return out
55 55
56 56 from zmq.eventloop import ioloop
57 57
58 58 from IPython.config.application import Application
59 59 from IPython.config.configurable import LoggingConfigurable
60 60 from IPython.utils.text import EvalFormatter
61 61 from IPython.utils.traitlets import (
62 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 63 )
64 64 from IPython.utils.encoding import DEFAULT_ENCODING
65 65 from IPython.utils.path import get_home_dir
66 66 from IPython.utils.process import find_cmd, FindCmdError
67 67 from IPython.utils.py3compat import iteritems, itervalues
68 68
69 69 from .win32support import forward_read_events
70 70
71 71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
72 72
73 73 WINDOWS = os.name == 'nt'
74 74
75 75 #-----------------------------------------------------------------------------
76 76 # Paths to the kernel apps
77 77 #-----------------------------------------------------------------------------
78 78
79 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
79 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
80 80
81 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
81 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
82 82
83 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
84
85 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
83 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
86 84
87 85 #-----------------------------------------------------------------------------
88 86 # Base launchers and errors
89 87 #-----------------------------------------------------------------------------
90 88
91 89 class LauncherError(Exception):
92 90 pass
93 91
94 92
95 93 class ProcessStateError(LauncherError):
96 94 pass
97 95
98 96
99 97 class UnknownStatus(LauncherError):
100 98 pass
101 99
102 100
103 101 class BaseLauncher(LoggingConfigurable):
104 102 """An asbtraction for starting, stopping and signaling a process."""
105 103
106 104 # In all of the launchers, the work_dir is where child processes will be
107 105 # run. This will usually be the profile_dir, but may not be. any work_dir
108 106 # passed into the __init__ method will override the config value.
109 107 # This should not be used to set the work_dir for the actual engine
110 108 # and controller. Instead, use their own config files or the
111 109 # controller_args, engine_args attributes of the launchers to add
112 110 # the work_dir option.
113 111 work_dir = Unicode(u'.')
114 112 loop = Instance('zmq.eventloop.ioloop.IOLoop')
115 113
116 114 start_data = Any()
117 115 stop_data = Any()
118 116
119 117 def _loop_default(self):
120 118 return ioloop.IOLoop.instance()
121 119
122 120 def __init__(self, work_dir=u'.', config=None, **kwargs):
123 121 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
124 122 self.state = 'before' # can be before, running, after
125 123 self.stop_callbacks = []
126 124 self.start_data = None
127 125 self.stop_data = None
128 126
129 127 @property
130 128 def args(self):
131 129 """A list of cmd and args that will be used to start the process.
132 130
133 131 This is what is passed to :func:`spawnProcess` and the first element
134 132 will be the process name.
135 133 """
136 134 return self.find_args()
137 135
138 136 def find_args(self):
139 137 """The ``.args`` property calls this to find the args list.
140 138
141 139 Subcommand should implement this to construct the cmd and args.
142 140 """
143 141 raise NotImplementedError('find_args must be implemented in a subclass')
144 142
145 143 @property
146 144 def arg_str(self):
147 145 """The string form of the program arguments."""
148 146 return ' '.join(self.args)
149 147
150 148 @property
151 149 def running(self):
152 150 """Am I running."""
153 151 if self.state == 'running':
154 152 return True
155 153 else:
156 154 return False
157 155
158 156 def start(self):
159 157 """Start the process."""
160 158 raise NotImplementedError('start must be implemented in a subclass')
161 159
162 160 def stop(self):
163 161 """Stop the process and notify observers of stopping.
164 162
165 163 This method will return None immediately.
166 164 To observe the actual process stopping, see :meth:`on_stop`.
167 165 """
168 166 raise NotImplementedError('stop must be implemented in a subclass')
169 167
170 168 def on_stop(self, f):
171 169 """Register a callback to be called with this Launcher's stop_data
172 170 when the process actually finishes.
173 171 """
174 172 if self.state=='after':
175 173 return f(self.stop_data)
176 174 else:
177 175 self.stop_callbacks.append(f)
178 176
179 177 def notify_start(self, data):
180 178 """Call this to trigger startup actions.
181 179
182 180 This logs the process startup and sets the state to 'running'. It is
183 181 a pass-through so it can be used as a callback.
184 182 """
185 183
186 184 self.log.debug('Process %r started: %r', self.args[0], data)
187 185 self.start_data = data
188 186 self.state = 'running'
189 187 return data
190 188
191 189 def notify_stop(self, data):
192 190 """Call this to trigger process stop actions.
193 191
194 192 This logs the process stopping and sets the state to 'after'. Call
195 193 this to trigger callbacks registered via :meth:`on_stop`."""
196 194
197 195 self.log.debug('Process %r stopped: %r', self.args[0], data)
198 196 self.stop_data = data
199 197 self.state = 'after'
200 198 for i in range(len(self.stop_callbacks)):
201 199 d = self.stop_callbacks.pop()
202 200 d(data)
203 201 return data
204 202
205 203 def signal(self, sig):
206 204 """Signal the process.
207 205
208 206 Parameters
209 207 ----------
210 208 sig : str or int
211 209 'KILL', 'INT', etc., or any signal number
212 210 """
213 211 raise NotImplementedError('signal must be implemented in a subclass')
214 212
215 213 class ClusterAppMixin(HasTraits):
216 214 """MixIn for cluster args as traits"""
217 215 profile_dir=Unicode('')
218 216 cluster_id=Unicode('')
219 217
220 218 @property
221 219 def cluster_args(self):
222 220 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
223 221
224 222 class ControllerMixin(ClusterAppMixin):
225 223 controller_cmd = List(ipcontroller_cmd_argv, config=True,
226 224 help="""Popen command to launch ipcontroller.""")
227 225 # Command line arguments to ipcontroller.
228 226 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
229 227 help="""command-line args to pass to ipcontroller""")
230 228
231 229 class EngineMixin(ClusterAppMixin):
232 230 engine_cmd = List(ipengine_cmd_argv, config=True,
233 231 help="""command to launch the Engine.""")
234 232 # Command line arguments for ipengine.
235 233 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 234 help="command-line arguments to pass to ipengine"
237 235 )
238 236
239 237
240 238 #-----------------------------------------------------------------------------
241 239 # Local process launchers
242 240 #-----------------------------------------------------------------------------
243 241
244 242
245 243 class LocalProcessLauncher(BaseLauncher):
246 244 """Start and stop an external process in an asynchronous manner.
247 245
248 246 This will launch the external process with a working directory of
249 247 ``self.work_dir``.
250 248 """
251 249
252 250 # This is used to to construct self.args, which is passed to
253 251 # spawnProcess.
254 252 cmd_and_args = List([])
255 253 poll_frequency = Integer(100) # in ms
256 254
257 255 def __init__(self, work_dir=u'.', config=None, **kwargs):
258 256 super(LocalProcessLauncher, self).__init__(
259 257 work_dir=work_dir, config=config, **kwargs
260 258 )
261 259 self.process = None
262 260 self.poller = None
263 261
264 262 def find_args(self):
265 263 return self.cmd_and_args
266 264
267 265 def start(self):
268 266 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
269 267 if self.state == 'before':
270 268 self.process = Popen(self.args,
271 269 stdout=PIPE,stderr=PIPE,stdin=PIPE,
272 270 env=os.environ,
273 271 cwd=self.work_dir
274 272 )
275 273 if WINDOWS:
276 274 self.stdout = forward_read_events(self.process.stdout)
277 275 self.stderr = forward_read_events(self.process.stderr)
278 276 else:
279 277 self.stdout = self.process.stdout.fileno()
280 278 self.stderr = self.process.stderr.fileno()
281 279 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
282 280 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
283 281 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
284 282 self.poller.start()
285 283 self.notify_start(self.process.pid)
286 284 else:
287 285 s = 'The process was already started and has state: %r' % self.state
288 286 raise ProcessStateError(s)
289 287
290 288 def stop(self):
291 289 return self.interrupt_then_kill()
292 290
293 291 def signal(self, sig):
294 292 if self.state == 'running':
295 293 if WINDOWS and sig != SIGINT:
296 294 # use Windows tree-kill for better child cleanup
297 295 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
298 296 else:
299 297 self.process.send_signal(sig)
300 298
301 299 def interrupt_then_kill(self, delay=2.0):
302 300 """Send INT, wait a delay and then send KILL."""
303 301 try:
304 302 self.signal(SIGINT)
305 303 except Exception:
306 304 self.log.debug("interrupt failed")
307 305 pass
308 306 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
309 307 self.killer.start()
310 308
311 309 # callbacks, etc:
312 310
313 311 def handle_stdout(self, fd, events):
314 312 if WINDOWS:
315 313 line = self.stdout.recv()
316 314 else:
317 315 line = self.process.stdout.readline()
318 316 # a stopped process will be readable but return empty strings
319 317 if line:
320 318 self.log.debug(line[:-1])
321 319 else:
322 320 self.poll()
323 321
324 322 def handle_stderr(self, fd, events):
325 323 if WINDOWS:
326 324 line = self.stderr.recv()
327 325 else:
328 326 line = self.process.stderr.readline()
329 327 # a stopped process will be readable but return empty strings
330 328 if line:
331 329 self.log.debug(line[:-1])
332 330 else:
333 331 self.poll()
334 332
335 333 def poll(self):
336 334 status = self.process.poll()
337 335 if status is not None:
338 336 self.poller.stop()
339 337 self.loop.remove_handler(self.stdout)
340 338 self.loop.remove_handler(self.stderr)
341 339 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
342 340 return status
343 341
344 342 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
345 343 """Launch a controller as a regular external process."""
346 344
347 345 def find_args(self):
348 346 return self.controller_cmd + self.cluster_args + self.controller_args
349 347
350 348 def start(self):
351 349 """Start the controller by profile_dir."""
352 350 return super(LocalControllerLauncher, self).start()
353 351
354 352
355 353 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
356 354 """Launch a single engine as a regular externall process."""
357 355
358 356 def find_args(self):
359 357 return self.engine_cmd + self.cluster_args + self.engine_args
360 358
361 359
362 360 class LocalEngineSetLauncher(LocalEngineLauncher):
363 361 """Launch a set of engines as regular external processes."""
364 362
365 363 delay = CFloat(0.1, config=True,
366 364 help="""delay (in seconds) between starting each engine after the first.
367 365 This can help force the engines to get their ids in order, or limit
368 366 process flood when starting many engines."""
369 367 )
370 368
371 369 # launcher class
372 370 launcher_class = LocalEngineLauncher
373 371
374 372 launchers = Dict()
375 373 stop_data = Dict()
376 374
377 375 def __init__(self, work_dir=u'.', config=None, **kwargs):
378 376 super(LocalEngineSetLauncher, self).__init__(
379 377 work_dir=work_dir, config=config, **kwargs
380 378 )
381 379 self.stop_data = {}
382 380
383 381 def start(self, n):
384 382 """Start n engines by profile or profile_dir."""
385 383 dlist = []
386 384 for i in range(n):
387 385 if i > 0:
388 386 time.sleep(self.delay)
389 387 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
390 388 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
391 389 )
392 390
393 391 # Copy the engine args over to each engine launcher.
394 392 el.engine_cmd = copy.deepcopy(self.engine_cmd)
395 393 el.engine_args = copy.deepcopy(self.engine_args)
396 394 el.on_stop(self._notice_engine_stopped)
397 395 d = el.start()
398 396 self.launchers[i] = el
399 397 dlist.append(d)
400 398 self.notify_start(dlist)
401 399 return dlist
402 400
403 401 def find_args(self):
404 402 return ['engine set']
405 403
406 404 def signal(self, sig):
407 405 dlist = []
408 406 for el in itervalues(self.launchers):
409 407 d = el.signal(sig)
410 408 dlist.append(d)
411 409 return dlist
412 410
413 411 def interrupt_then_kill(self, delay=1.0):
414 412 dlist = []
415 413 for el in itervalues(self.launchers):
416 414 d = el.interrupt_then_kill(delay)
417 415 dlist.append(d)
418 416 return dlist
419 417
420 418 def stop(self):
421 419 return self.interrupt_then_kill()
422 420
423 421 def _notice_engine_stopped(self, data):
424 422 pid = data['pid']
425 423 for idx,el in iteritems(self.launchers):
426 424 if el.process.pid == pid:
427 425 break
428 426 self.launchers.pop(idx)
429 427 self.stop_data[idx] = data
430 428 if not self.launchers:
431 429 self.notify_stop(self.stop_data)
432 430
433 431
434 432 #-----------------------------------------------------------------------------
435 433 # MPI launchers
436 434 #-----------------------------------------------------------------------------
437 435
438 436
439 437 class MPILauncher(LocalProcessLauncher):
440 438 """Launch an external process using mpiexec."""
441 439
442 440 mpi_cmd = List(['mpiexec'], config=True,
443 441 help="The mpiexec command to use in starting the process."
444 442 )
445 443 mpi_args = List([], config=True,
446 444 help="The command line arguments to pass to mpiexec."
447 445 )
448 446 program = List(['date'],
449 447 help="The program to start via mpiexec.")
450 448 program_args = List([],
451 449 help="The command line argument to the program."
452 450 )
453 451 n = Integer(1)
454 452
455 453 def __init__(self, *args, **kwargs):
456 454 # deprecation for old MPIExec names:
457 455 config = kwargs.get('config', {})
458 456 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
459 457 deprecated = config.get(oldname)
460 458 if deprecated:
461 459 newname = oldname.replace('MPIExec', 'MPI')
462 460 config[newname].update(deprecated)
463 461 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
464 462
465 463 super(MPILauncher, self).__init__(*args, **kwargs)
466 464
467 465 def find_args(self):
468 466 """Build self.args using all the fields."""
469 467 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
470 468 self.program + self.program_args
471 469
472 470 def start(self, n):
473 471 """Start n instances of the program using mpiexec."""
474 472 self.n = n
475 473 return super(MPILauncher, self).start()
476 474
477 475
478 476 class MPIControllerLauncher(MPILauncher, ControllerMixin):
479 477 """Launch a controller using mpiexec."""
480 478
481 479 # alias back to *non-configurable* program[_args] for use in find_args()
482 480 # this way all Controller/EngineSetLaunchers have the same form, rather
483 481 # than *some* having `program_args` and others `controller_args`
484 482 @property
485 483 def program(self):
486 484 return self.controller_cmd
487 485
488 486 @property
489 487 def program_args(self):
490 488 return self.cluster_args + self.controller_args
491 489
492 490 def start(self):
493 491 """Start the controller by profile_dir."""
494 492 return super(MPIControllerLauncher, self).start(1)
495 493
496 494
497 495 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
498 496 """Launch engines using mpiexec"""
499 497
500 498 # alias back to *non-configurable* program[_args] for use in find_args()
501 499 # this way all Controller/EngineSetLaunchers have the same form, rather
502 500 # than *some* having `program_args` and others `controller_args`
503 501 @property
504 502 def program(self):
505 503 return self.engine_cmd
506 504
507 505 @property
508 506 def program_args(self):
509 507 return self.cluster_args + self.engine_args
510 508
511 509 def start(self, n):
512 510 """Start n engines by profile or profile_dir."""
513 511 self.n = n
514 512 return super(MPIEngineSetLauncher, self).start(n)
515 513
516 514 # deprecated MPIExec names
517 515 class DeprecatedMPILauncher(object):
518 516 def warn(self):
519 517 oldname = self.__class__.__name__
520 518 newname = oldname.replace('MPIExec', 'MPI')
521 519 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
522 520
523 521 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
524 522 """Deprecated, use MPILauncher"""
525 523 def __init__(self, *args, **kwargs):
526 524 super(MPIExecLauncher, self).__init__(*args, **kwargs)
527 525 self.warn()
528 526
529 527 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
530 528 """Deprecated, use MPIControllerLauncher"""
531 529 def __init__(self, *args, **kwargs):
532 530 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
533 531 self.warn()
534 532
535 533 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
536 534 """Deprecated, use MPIEngineSetLauncher"""
537 535 def __init__(self, *args, **kwargs):
538 536 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
539 537 self.warn()
540 538
541 539
542 540 #-----------------------------------------------------------------------------
543 541 # SSH launchers
544 542 #-----------------------------------------------------------------------------
545 543
546 544 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
547 545
548 546 class SSHLauncher(LocalProcessLauncher):
549 547 """A minimal launcher for ssh.
550 548
551 549 To be useful this will probably have to be extended to use the ``sshx``
552 550 idea for environment variables. There could be other things this needs
553 551 as well.
554 552 """
555 553
556 554 ssh_cmd = List(['ssh'], config=True,
557 555 help="command for starting ssh")
558 556 ssh_args = List(['-tt'], config=True,
559 557 help="args to pass to ssh")
560 558 scp_cmd = List(['scp'], config=True,
561 559 help="command for sending files")
562 560 program = List(['date'],
563 561 help="Program to launch via ssh")
564 562 program_args = List([],
565 563 help="args to pass to remote program")
566 564 hostname = Unicode('', config=True,
567 565 help="hostname on which to launch the program")
568 566 user = Unicode('', config=True,
569 567 help="username for ssh")
570 568 location = Unicode('', config=True,
571 569 help="user@hostname location for ssh in one setting")
572 570 to_fetch = List([], config=True,
573 571 help="List of (remote, local) files to fetch after starting")
574 572 to_send = List([], config=True,
575 573 help="List of (local, remote) files to send before starting")
576 574
577 575 def _hostname_changed(self, name, old, new):
578 576 if self.user:
579 577 self.location = u'%s@%s' % (self.user, new)
580 578 else:
581 579 self.location = new
582 580
583 581 def _user_changed(self, name, old, new):
584 582 self.location = u'%s@%s' % (new, self.hostname)
585 583
586 584 def find_args(self):
587 585 return self.ssh_cmd + self.ssh_args + [self.location] + \
588 586 list(map(pipes.quote, self.program + self.program_args))
589 587
590 588 def _send_file(self, local, remote):
591 589 """send a single file"""
592 590 remote = "%s:%s" % (self.location, remote)
593 591 for i in range(10):
594 592 if not os.path.exists(local):
595 593 self.log.debug("waiting for %s" % local)
596 594 time.sleep(1)
597 595 else:
598 596 break
599 597 self.log.info("sending %s to %s", local, remote)
600 598 check_output(self.scp_cmd + [local, remote])
601 599
602 600 def send_files(self):
603 601 """send our files (called before start)"""
604 602 if not self.to_send:
605 603 return
606 604 for local_file, remote_file in self.to_send:
607 605 self._send_file(local_file, remote_file)
608 606
609 607 def _fetch_file(self, remote, local):
610 608 """fetch a single file"""
611 609 full_remote = "%s:%s" % (self.location, remote)
612 610 self.log.info("fetching %s from %s", local, full_remote)
613 611 for i in range(10):
614 612 # wait up to 10s for remote file to exist
615 613 check = check_output(self.ssh_cmd + self.ssh_args + \
616 614 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
617 615 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
618 616 if check == u'no':
619 617 time.sleep(1)
620 618 elif check == u'yes':
621 619 break
622 620 check_output(self.scp_cmd + [full_remote, local])
623 621
624 622 def fetch_files(self):
625 623 """fetch remote files (called after start)"""
626 624 if not self.to_fetch:
627 625 return
628 626 for remote_file, local_file in self.to_fetch:
629 627 self._fetch_file(remote_file, local_file)
630 628
631 629 def start(self, hostname=None, user=None):
632 630 if hostname is not None:
633 631 self.hostname = hostname
634 632 if user is not None:
635 633 self.user = user
636 634
637 635 self.send_files()
638 636 super(SSHLauncher, self).start()
639 637 self.fetch_files()
640 638
641 639 def signal(self, sig):
642 640 if self.state == 'running':
643 641 # send escaped ssh connection-closer
644 642 self.process.stdin.write('~.')
645 643 self.process.stdin.flush()
646 644
647 645 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
648 646
649 647 remote_profile_dir = Unicode('', config=True,
650 648 help="""The remote profile_dir to use.
651 649
652 650 If not specified, use calling profile, stripping out possible leading homedir.
653 651 """)
654 652
655 653 def _profile_dir_changed(self, name, old, new):
656 654 if not self.remote_profile_dir:
657 655 # trigger remote_profile_dir_default logic again,
658 656 # in case it was already triggered before profile_dir was set
659 657 self.remote_profile_dir = self._strip_home(new)
660 658
661 659 @staticmethod
662 660 def _strip_home(path):
663 661 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
664 662 home = get_home_dir()
665 663 if not home.endswith('/'):
666 664 home = home+'/'
667 665
668 666 if path.startswith(home):
669 667 return path[len(home):]
670 668 else:
671 669 return path
672 670
673 671 def _remote_profile_dir_default(self):
674 672 return self._strip_home(self.profile_dir)
675 673
676 674 def _cluster_id_changed(self, name, old, new):
677 675 if new:
678 676 raise ValueError("cluster id not supported by SSH launchers")
679 677
680 678 @property
681 679 def cluster_args(self):
682 680 return ['--profile-dir', self.remote_profile_dir]
683 681
684 682 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
685 683
686 684 # alias back to *non-configurable* program[_args] for use in find_args()
687 685 # this way all Controller/EngineSetLaunchers have the same form, rather
688 686 # than *some* having `program_args` and others `controller_args`
689 687
690 688 def _controller_cmd_default(self):
691 689 return ['ipcontroller']
692 690
693 691 @property
694 692 def program(self):
695 693 return self.controller_cmd
696 694
697 695 @property
698 696 def program_args(self):
699 697 return self.cluster_args + self.controller_args
700 698
701 699 def _to_fetch_default(self):
702 700 return [
703 701 (os.path.join(self.remote_profile_dir, 'security', cf),
704 702 os.path.join(self.profile_dir, 'security', cf),)
705 703 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
706 704 ]
707 705
708 706 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
709 707
710 708 # alias back to *non-configurable* program[_args] for use in find_args()
711 709 # this way all Controller/EngineSetLaunchers have the same form, rather
712 710 # than *some* having `program_args` and others `controller_args`
713 711
714 712 def _engine_cmd_default(self):
715 713 return ['ipengine']
716 714
717 715 @property
718 716 def program(self):
719 717 return self.engine_cmd
720 718
721 719 @property
722 720 def program_args(self):
723 721 return self.cluster_args + self.engine_args
724 722
725 723 def _to_send_default(self):
726 724 return [
727 725 (os.path.join(self.profile_dir, 'security', cf),
728 726 os.path.join(self.remote_profile_dir, 'security', cf))
729 727 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
730 728 ]
731 729
732 730
733 731 class SSHEngineSetLauncher(LocalEngineSetLauncher):
734 732 launcher_class = SSHEngineLauncher
735 733 engines = Dict(config=True,
736 734 help="""dict of engines to launch. This is a dict by hostname of ints,
737 735 corresponding to the number of engines to start on that host.""")
738 736
739 737 def _engine_cmd_default(self):
740 738 return ['ipengine']
741 739
742 740 @property
743 741 def engine_count(self):
744 742 """determine engine count from `engines` dict"""
745 743 count = 0
746 744 for n in itervalues(self.engines):
747 745 if isinstance(n, (tuple,list)):
748 746 n,args = n
749 747 count += n
750 748 return count
751 749
752 750 def start(self, n):
753 751 """Start engines by profile or profile_dir.
754 752 `n` is ignored, and the `engines` config property is used instead.
755 753 """
756 754
757 755 dlist = []
758 756 for host, n in iteritems(self.engines):
759 757 if isinstance(n, (tuple, list)):
760 758 n, args = n
761 759 else:
762 760 args = copy.deepcopy(self.engine_args)
763 761
764 762 if '@' in host:
765 763 user,host = host.split('@',1)
766 764 else:
767 765 user=None
768 766 for i in range(n):
769 767 if i > 0:
770 768 time.sleep(self.delay)
771 769 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
772 770 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
773 771 )
774 772 if i > 0:
775 773 # only send files for the first engine on each host
776 774 el.to_send = []
777 775
778 776 # Copy the engine args over to each engine launcher.
779 777 el.engine_cmd = self.engine_cmd
780 778 el.engine_args = args
781 779 el.on_stop(self._notice_engine_stopped)
782 780 d = el.start(user=user, hostname=host)
783 781 self.launchers[ "%s/%i" % (host,i) ] = el
784 782 dlist.append(d)
785 783 self.notify_start(dlist)
786 784 return dlist
787 785
788 786
789 787 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
790 788 """Launcher for calling
791 789 `ipcluster engines` on a remote machine.
792 790
793 791 Requires that remote profile is already configured.
794 792 """
795 793
796 794 n = Integer()
797 795 ipcluster_cmd = List(['ipcluster'], config=True)
798 796
799 797 @property
800 798 def program(self):
801 799 return self.ipcluster_cmd + ['engines']
802 800
803 801 @property
804 802 def program_args(self):
805 803 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
806 804
807 805 def _to_send_default(self):
808 806 return [
809 807 (os.path.join(self.profile_dir, 'security', cf),
810 808 os.path.join(self.remote_profile_dir, 'security', cf))
811 809 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
812 810 ]
813 811
814 812 def start(self, n):
815 813 self.n = n
816 814 super(SSHProxyEngineSetLauncher, self).start()
817 815
818 816
819 817 #-----------------------------------------------------------------------------
820 818 # Windows HPC Server 2008 scheduler launchers
821 819 #-----------------------------------------------------------------------------
822 820
823 821
824 822 # This is only used on Windows.
825 823 def find_job_cmd():
826 824 if WINDOWS:
827 825 try:
828 826 return find_cmd('job')
829 827 except (FindCmdError, ImportError):
830 828 # ImportError will be raised if win32api is not installed
831 829 return 'job'
832 830 else:
833 831 return 'job'
834 832
835 833
836 834 class WindowsHPCLauncher(BaseLauncher):
837 835
838 836 job_id_regexp = CRegExp(r'\d+', config=True,
839 837 help="""A regular expression used to get the job id from the output of the
840 838 submit_command. """
841 839 )
842 840 job_file_name = Unicode(u'ipython_job.xml', config=True,
843 841 help="The filename of the instantiated job script.")
844 842 # The full path to the instantiated job script. This gets made dynamically
845 843 # by combining the work_dir with the job_file_name.
846 844 job_file = Unicode(u'')
847 845 scheduler = Unicode('', config=True,
848 846 help="The hostname of the scheduler to submit the job to.")
849 847 job_cmd = Unicode(find_job_cmd(), config=True,
850 848 help="The command for submitting jobs.")
851 849
852 850 def __init__(self, work_dir=u'.', config=None, **kwargs):
853 851 super(WindowsHPCLauncher, self).__init__(
854 852 work_dir=work_dir, config=config, **kwargs
855 853 )
856 854
857 855 @property
858 856 def job_file(self):
859 857 return os.path.join(self.work_dir, self.job_file_name)
860 858
861 859 def write_job_file(self, n):
862 860 raise NotImplementedError("Implement write_job_file in a subclass.")
863 861
864 862 def find_args(self):
865 863 return [u'job.exe']
866 864
867 865 def parse_job_id(self, output):
868 866 """Take the output of the submit command and return the job id."""
869 867 m = self.job_id_regexp.search(output)
870 868 if m is not None:
871 869 job_id = m.group()
872 870 else:
873 871 raise LauncherError("Job id couldn't be determined: %s" % output)
874 872 self.job_id = job_id
875 873 self.log.info('Job started with id: %r', job_id)
876 874 return job_id
877 875
878 876 def start(self, n):
879 877 """Start n copies of the process using the Win HPC job scheduler."""
880 878 self.write_job_file(n)
881 879 args = [
882 880 'submit',
883 881 '/jobfile:%s' % self.job_file,
884 882 '/scheduler:%s' % self.scheduler
885 883 ]
886 884 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
887 885
888 886 output = check_output([self.job_cmd]+args,
889 887 env=os.environ,
890 888 cwd=self.work_dir,
891 889 stderr=STDOUT
892 890 )
893 891 output = output.decode(DEFAULT_ENCODING, 'replace')
894 892 job_id = self.parse_job_id(output)
895 893 self.notify_start(job_id)
896 894 return job_id
897 895
898 896 def stop(self):
899 897 args = [
900 898 'cancel',
901 899 self.job_id,
902 900 '/scheduler:%s' % self.scheduler
903 901 ]
904 902 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
905 903 try:
906 904 output = check_output([self.job_cmd]+args,
907 905 env=os.environ,
908 906 cwd=self.work_dir,
909 907 stderr=STDOUT
910 908 )
911 909 output = output.decode(DEFAULT_ENCODING, 'replace')
912 910 except:
913 911 output = u'The job already appears to be stopped: %r' % self.job_id
914 912 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
915 913 return output
916 914
917 915
918 916 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
919 917
920 918 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
921 919 help="WinHPC xml job file.")
922 920 controller_args = List([], config=False,
923 921 help="extra args to pass to ipcontroller")
924 922
925 923 def write_job_file(self, n):
926 924 job = IPControllerJob(parent=self)
927 925
928 926 t = IPControllerTask(parent=self)
929 927 # The tasks work directory is *not* the actual work directory of
930 928 # the controller. It is used as the base path for the stdout/stderr
931 929 # files that the scheduler redirects to.
932 930 t.work_directory = self.profile_dir
933 931 # Add the profile_dir and from self.start().
934 932 t.controller_args.extend(self.cluster_args)
935 933 t.controller_args.extend(self.controller_args)
936 934 job.add_task(t)
937 935
938 936 self.log.debug("Writing job description file: %s", self.job_file)
939 937 job.write(self.job_file)
940 938
941 939 @property
942 940 def job_file(self):
943 941 return os.path.join(self.profile_dir, self.job_file_name)
944 942
945 943 def start(self):
946 944 """Start the controller by profile_dir."""
947 945 return super(WindowsHPCControllerLauncher, self).start(1)
948 946
949 947
950 948 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
951 949
952 950 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
953 951 help="jobfile for ipengines job")
954 952 engine_args = List([], config=False,
955 953 help="extra args to pas to ipengine")
956 954
957 955 def write_job_file(self, n):
958 956 job = IPEngineSetJob(parent=self)
959 957
960 958 for i in range(n):
961 959 t = IPEngineTask(parent=self)
962 960 # The tasks work directory is *not* the actual work directory of
963 961 # the engine. It is used as the base path for the stdout/stderr
964 962 # files that the scheduler redirects to.
965 963 t.work_directory = self.profile_dir
966 964 # Add the profile_dir and from self.start().
967 965 t.engine_args.extend(self.cluster_args)
968 966 t.engine_args.extend(self.engine_args)
969 967 job.add_task(t)
970 968
971 969 self.log.debug("Writing job description file: %s", self.job_file)
972 970 job.write(self.job_file)
973 971
974 972 @property
975 973 def job_file(self):
976 974 return os.path.join(self.profile_dir, self.job_file_name)
977 975
978 976 def start(self, n):
979 977 """Start the controller by profile_dir."""
980 978 return super(WindowsHPCEngineSetLauncher, self).start(n)
981 979
982 980
983 981 #-----------------------------------------------------------------------------
984 982 # Batch (PBS) system launchers
985 983 #-----------------------------------------------------------------------------
986 984
987 985 class BatchClusterAppMixin(ClusterAppMixin):
988 986 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
989 987 def _profile_dir_changed(self, name, old, new):
990 988 self.context[name] = new
991 989 _cluster_id_changed = _profile_dir_changed
992 990
993 991 def _profile_dir_default(self):
994 992 self.context['profile_dir'] = ''
995 993 return ''
996 994 def _cluster_id_default(self):
997 995 self.context['cluster_id'] = ''
998 996 return ''
999 997
1000 998
1001 999 class BatchSystemLauncher(BaseLauncher):
1002 1000 """Launch an external process using a batch system.
1003 1001
1004 1002 This class is designed to work with UNIX batch systems like PBS, LSF,
1005 1003 GridEngine, etc. The overall model is that there are different commands
1006 1004 like qsub, qdel, etc. that handle the starting and stopping of the process.
1007 1005
1008 1006 This class also has the notion of a batch script. The ``batch_template``
1009 1007 attribute can be set to a string that is a template for the batch script.
1010 1008 This template is instantiated using string formatting. Thus the template can
1011 1009 use {n} fot the number of instances. Subclasses can add additional variables
1012 1010 to the template dict.
1013 1011 """
1014 1012
1015 1013 # Subclasses must fill these in. See PBSEngineSet
1016 1014 submit_command = List([''], config=True,
1017 1015 help="The name of the command line program used to submit jobs.")
1018 1016 delete_command = List([''], config=True,
1019 1017 help="The name of the command line program used to delete jobs.")
1020 1018 job_id_regexp = CRegExp('', config=True,
1021 1019 help="""A regular expression used to get the job id from the output of the
1022 1020 submit_command.""")
1023 1021 job_id_regexp_group = Integer(0, config=True,
1024 1022 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1025 1023 batch_template = Unicode('', config=True,
1026 1024 help="The string that is the batch script template itself.")
1027 1025 batch_template_file = Unicode(u'', config=True,
1028 1026 help="The file that contains the batch template.")
1029 1027 batch_file_name = Unicode(u'batch_script', config=True,
1030 1028 help="The filename of the instantiated batch script.")
1031 1029 queue = Unicode(u'', config=True,
1032 1030 help="The PBS Queue.")
1033 1031
1034 1032 def _queue_changed(self, name, old, new):
1035 1033 self.context[name] = new
1036 1034
1037 1035 n = Integer(1)
1038 1036 _n_changed = _queue_changed
1039 1037
1040 1038 # not configurable, override in subclasses
1041 1039 # PBS Job Array regex
1042 1040 job_array_regexp = CRegExp('')
1043 1041 job_array_template = Unicode('')
1044 1042 # PBS Queue regex
1045 1043 queue_regexp = CRegExp('')
1046 1044 queue_template = Unicode('')
1047 1045 # The default batch template, override in subclasses
1048 1046 default_template = Unicode('')
1049 1047 # The full path to the instantiated batch script.
1050 1048 batch_file = Unicode(u'')
1051 1049 # the format dict used with batch_template:
1052 1050 context = Dict()
1053 1051
1054 1052 def _context_default(self):
1055 1053 """load the default context with the default values for the basic keys
1056 1054
1057 1055 because the _trait_changed methods only load the context if they
1058 1056 are set to something other than the default value.
1059 1057 """
1060 1058 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1061 1059
1062 1060 # the Formatter instance for rendering the templates:
1063 1061 formatter = Instance(EvalFormatter, (), {})
1064 1062
1065 1063 def find_args(self):
1066 1064 return self.submit_command + [self.batch_file]
1067 1065
1068 1066 def __init__(self, work_dir=u'.', config=None, **kwargs):
1069 1067 super(BatchSystemLauncher, self).__init__(
1070 1068 work_dir=work_dir, config=config, **kwargs
1071 1069 )
1072 1070 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1073 1071
1074 1072 def parse_job_id(self, output):
1075 1073 """Take the output of the submit command and return the job id."""
1076 1074 m = self.job_id_regexp.search(output)
1077 1075 if m is not None:
1078 1076 job_id = m.group(self.job_id_regexp_group)
1079 1077 else:
1080 1078 raise LauncherError("Job id couldn't be determined: %s" % output)
1081 1079 self.job_id = job_id
1082 1080 self.log.info('Job submitted with job id: %r', job_id)
1083 1081 return job_id
1084 1082
1085 1083 def write_batch_script(self, n):
1086 1084 """Instantiate and write the batch script to the work_dir."""
1087 1085 self.n = n
1088 1086 # first priority is batch_template if set
1089 1087 if self.batch_template_file and not self.batch_template:
1090 1088 # second priority is batch_template_file
1091 1089 with open(self.batch_template_file) as f:
1092 1090 self.batch_template = f.read()
1093 1091 if not self.batch_template:
1094 1092 # third (last) priority is default_template
1095 1093 self.batch_template = self.default_template
1096 1094 # add jobarray or queue lines to user-specified template
1097 1095 # note that this is *only* when user did not specify a template.
1098 1096 self._insert_queue_in_script()
1099 1097 self._insert_job_array_in_script()
1100 1098 script_as_string = self.formatter.format(self.batch_template, **self.context)
1101 1099 self.log.debug('Writing batch script: %s', self.batch_file)
1102 1100 with open(self.batch_file, 'w') as f:
1103 1101 f.write(script_as_string)
1104 1102 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1105 1103
1106 1104 def _insert_queue_in_script(self):
1107 1105 """Inserts a queue if required into the batch script.
1108 1106 """
1109 1107 if self.queue and not self.queue_regexp.search(self.batch_template):
1110 1108 self.log.debug("adding PBS queue settings to batch script")
1111 1109 firstline, rest = self.batch_template.split('\n',1)
1112 1110 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1113 1111
1114 1112 def _insert_job_array_in_script(self):
1115 1113 """Inserts a job array if required into the batch script.
1116 1114 """
1117 1115 if not self.job_array_regexp.search(self.batch_template):
1118 1116 self.log.debug("adding job array settings to batch script")
1119 1117 firstline, rest = self.batch_template.split('\n',1)
1120 1118 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1121 1119
1122 1120 def start(self, n):
1123 1121 """Start n copies of the process using a batch system."""
1124 1122 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1125 1123 # Here we save profile_dir in the context so they
1126 1124 # can be used in the batch script template as {profile_dir}
1127 1125 self.write_batch_script(n)
1128 1126 output = check_output(self.args, env=os.environ)
1129 1127 output = output.decode(DEFAULT_ENCODING, 'replace')
1130 1128
1131 1129 job_id = self.parse_job_id(output)
1132 1130 self.notify_start(job_id)
1133 1131 return job_id
1134 1132
1135 1133 def stop(self):
1136 1134 try:
1137 1135 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1138 1136 stdout=PIPE, stderr=PIPE)
1139 1137 out, err = p.communicate()
1140 1138 output = out + err
1141 1139 except:
1142 1140 self.log.exception("Problem stopping cluster with command: %s" %
1143 1141 (self.delete_command + [self.job_id]))
1144 1142 output = ""
1145 1143 output = output.decode(DEFAULT_ENCODING, 'replace')
1146 1144 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1147 1145 return output
1148 1146
1149 1147
1150 1148 class PBSLauncher(BatchSystemLauncher):
1151 1149 """A BatchSystemLauncher subclass for PBS."""
1152 1150
1153 1151 submit_command = List(['qsub'], config=True,
1154 1152 help="The PBS submit command ['qsub']")
1155 1153 delete_command = List(['qdel'], config=True,
1156 1154 help="The PBS delete command ['qsub']")
1157 1155 job_id_regexp = CRegExp(r'\d+', config=True,
1158 1156 help="Regular expresion for identifying the job ID [r'\d+']")
1159 1157
1160 1158 batch_file = Unicode(u'')
1161 1159 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1162 1160 job_array_template = Unicode('#PBS -t 1-{n}')
1163 1161 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1164 1162 queue_template = Unicode('#PBS -q {queue}')
1165 1163
1166 1164
1167 1165 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1168 1166 """Launch a controller using PBS."""
1169 1167
1170 1168 batch_file_name = Unicode(u'pbs_controller', config=True,
1171 1169 help="batch file name for the controller job.")
1172 1170 default_template= Unicode("""#!/bin/sh
1173 1171 #PBS -V
1174 1172 #PBS -N ipcontroller
1175 1173 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1176 1174 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1177 1175
1178 1176 def start(self):
1179 1177 """Start the controller by profile or profile_dir."""
1180 1178 return super(PBSControllerLauncher, self).start(1)
1181 1179
1182 1180
1183 1181 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1184 1182 """Launch Engines using PBS"""
1185 1183 batch_file_name = Unicode(u'pbs_engines', config=True,
1186 1184 help="batch file name for the engine(s) job.")
1187 1185 default_template= Unicode(u"""#!/bin/sh
1188 1186 #PBS -V
1189 1187 #PBS -N ipengine
1190 1188 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1191 1189 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1192 1190
1193 1191
1194 1192 #SGE is very similar to PBS
1195 1193
1196 1194 class SGELauncher(PBSLauncher):
1197 1195 """Sun GridEngine is a PBS clone with slightly different syntax"""
1198 1196 job_array_regexp = CRegExp('#\$\W+\-t')
1199 1197 job_array_template = Unicode('#$ -t 1-{n}')
1200 1198 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1201 1199 queue_template = Unicode('#$ -q {queue}')
1202 1200
1203 1201
1204 1202 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1205 1203 """Launch a controller using SGE."""
1206 1204
1207 1205 batch_file_name = Unicode(u'sge_controller', config=True,
1208 1206 help="batch file name for the ipontroller job.")
1209 1207 default_template= Unicode(u"""#$ -V
1210 1208 #$ -S /bin/sh
1211 1209 #$ -N ipcontroller
1212 1210 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1213 1211 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1214 1212
1215 1213 def start(self):
1216 1214 """Start the controller by profile or profile_dir."""
1217 1215 return super(SGEControllerLauncher, self).start(1)
1218 1216
1219 1217
1220 1218 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1221 1219 """Launch Engines with SGE"""
1222 1220 batch_file_name = Unicode(u'sge_engines', config=True,
1223 1221 help="batch file name for the engine(s) job.")
1224 1222 default_template = Unicode("""#$ -V
1225 1223 #$ -S /bin/sh
1226 1224 #$ -N ipengine
1227 1225 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1228 1226 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1229 1227
1230 1228
1231 1229 # LSF launchers
1232 1230
1233 1231 class LSFLauncher(BatchSystemLauncher):
1234 1232 """A BatchSystemLauncher subclass for LSF."""
1235 1233
1236 1234 submit_command = List(['bsub'], config=True,
1237 1235 help="The PBS submit command ['bsub']")
1238 1236 delete_command = List(['bkill'], config=True,
1239 1237 help="The PBS delete command ['bkill']")
1240 1238 job_id_regexp = CRegExp(r'\d+', config=True,
1241 1239 help="Regular expresion for identifying the job ID [r'\d+']")
1242 1240
1243 1241 batch_file = Unicode(u'')
1244 1242 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1245 1243 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1246 1244 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1247 1245 queue_template = Unicode('#BSUB -q {queue}')
1248 1246
1249 1247 def start(self, n):
1250 1248 """Start n copies of the process using LSF batch system.
1251 1249 This cant inherit from the base class because bsub expects
1252 1250 to be piped a shell script in order to honor the #BSUB directives :
1253 1251 bsub < script
1254 1252 """
1255 1253 # Here we save profile_dir in the context so they
1256 1254 # can be used in the batch script template as {profile_dir}
1257 1255 self.write_batch_script(n)
1258 1256 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1259 1257 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1260 1258 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1261 1259 output,err = p.communicate()
1262 1260 output = output.decode(DEFAULT_ENCODING, 'replace')
1263 1261 job_id = self.parse_job_id(output)
1264 1262 self.notify_start(job_id)
1265 1263 return job_id
1266 1264
1267 1265
1268 1266 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1269 1267 """Launch a controller using LSF."""
1270 1268
1271 1269 batch_file_name = Unicode(u'lsf_controller', config=True,
1272 1270 help="batch file name for the controller job.")
1273 1271 default_template= Unicode("""#!/bin/sh
1274 1272 #BSUB -J ipcontroller
1275 1273 #BSUB -oo ipcontroller.o.%%J
1276 1274 #BSUB -eo ipcontroller.e.%%J
1277 1275 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1278 1276 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1279 1277
1280 1278 def start(self):
1281 1279 """Start the controller by profile or profile_dir."""
1282 1280 return super(LSFControllerLauncher, self).start(1)
1283 1281
1284 1282
1285 1283 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1286 1284 """Launch Engines using LSF"""
1287 1285 batch_file_name = Unicode(u'lsf_engines', config=True,
1288 1286 help="batch file name for the engine(s) job.")
1289 1287 default_template= Unicode(u"""#!/bin/sh
1290 1288 #BSUB -oo ipengine.o.%%J
1291 1289 #BSUB -eo ipengine.e.%%J
1292 1290 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1293 1291 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1294 1292
1295 1293
1296 1294
1297 1295 class HTCondorLauncher(BatchSystemLauncher):
1298 1296 """A BatchSystemLauncher subclass for HTCondor.
1299 1297
1300 1298 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1301 1299 that the python instance but otherwise is very similar to PBS. This is because
1302 1300 HTCondor destroys sys.executable when launching remote processes - a launched
1303 1301 python process depends on sys.executable to effectively evaluate its
1304 1302 module search paths. Without it, regardless of which python interpreter you launch
1305 1303 you will get the to built in module search paths.
1306 1304
1307 1305 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1308 1306 this - the mechanism of shebanged scripts means that the python binary will be
1309 1307 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1310 1308 scripts on the remote node*. This means you need to take care that:
1311 1309
1312 1310 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1313 1311 of the python environment you wish to execute code in having top precedence.
1314 1312 b. This functionality is untested on Windows.
1315 1313
1316 1314 If you need different behavior, consider making you own template.
1317 1315 """
1318 1316
1319 1317 submit_command = List(['condor_submit'], config=True,
1320 1318 help="The HTCondor submit command ['condor_submit']")
1321 1319 delete_command = List(['condor_rm'], config=True,
1322 1320 help="The HTCondor delete command ['condor_rm']")
1323 1321 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1324 1322 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1325 1323 job_id_regexp_group = Integer(1, config=True,
1326 1324 help="""The group we wish to match in job_id_regexp [1]""")
1327 1325
1328 1326 job_array_regexp = CRegExp('queue\W+\$')
1329 1327 job_array_template = Unicode('queue {n}')
1330 1328
1331 1329
1332 1330 def _insert_job_array_in_script(self):
1333 1331 """Inserts a job array if required into the batch script.
1334 1332 """
1335 1333 if not self.job_array_regexp.search(self.batch_template):
1336 1334 self.log.debug("adding job array settings to batch script")
1337 1335 #HTCondor requires that the job array goes at the bottom of the script
1338 1336 self.batch_template = '\n'.join([self.batch_template,
1339 1337 self.job_array_template])
1340 1338
1341 1339 def _insert_queue_in_script(self):
1342 1340 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1343 1341 specified in the script.
1344 1342 """
1345 1343 pass
1346 1344
1347 1345
1348 1346 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1349 1347 """Launch a controller using HTCondor."""
1350 1348
1351 1349 batch_file_name = Unicode(u'htcondor_controller', config=True,
1352 1350 help="batch file name for the controller job.")
1353 1351 default_template = Unicode(r"""
1354 1352 universe = vanilla
1355 1353 executable = ipcontroller
1356 1354 # by default we expect a shared file system
1357 1355 transfer_executable = False
1358 1356 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1359 1357 """)
1360 1358
1361 1359 def start(self):
1362 1360 """Start the controller by profile or profile_dir."""
1363 1361 return super(HTCondorControllerLauncher, self).start(1)
1364 1362
1365 1363
1366 1364 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1367 1365 """Launch Engines using HTCondor"""
1368 1366 batch_file_name = Unicode(u'htcondor_engines', config=True,
1369 1367 help="batch file name for the engine(s) job.")
1370 1368 default_template = Unicode("""
1371 1369 universe = vanilla
1372 1370 executable = ipengine
1373 1371 # by default we expect a shared file system
1374 1372 transfer_executable = False
1375 1373 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1376 1374 """)
1377 1375
1378 1376
1379 1377 #-----------------------------------------------------------------------------
1380 1378 # A launcher for ipcluster itself!
1381 1379 #-----------------------------------------------------------------------------
1382 1380
1383 1381
1384 1382 class IPClusterLauncher(LocalProcessLauncher):
1385 1383 """Launch the ipcluster program in an external process."""
1386 1384
1387 1385 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1388 1386 help="Popen command for ipcluster")
1389 1387 ipcluster_args = List(
1390 1388 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1391 1389 help="Command line arguments to pass to ipcluster.")
1392 1390 ipcluster_subcommand = Unicode('start')
1393 1391 profile = Unicode('default')
1394 1392 n = Integer(2)
1395 1393
1396 1394 def find_args(self):
1397 1395 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1398 1396 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1399 1397 self.ipcluster_args
1400 1398
1401 1399 def start(self):
1402 1400 return super(IPClusterLauncher, self).start()
1403 1401
1404 1402 #-----------------------------------------------------------------------------
1405 1403 # Collections of launchers
1406 1404 #-----------------------------------------------------------------------------
1407 1405
1408 1406 local_launchers = [
1409 1407 LocalControllerLauncher,
1410 1408 LocalEngineLauncher,
1411 1409 LocalEngineSetLauncher,
1412 1410 ]
1413 1411 mpi_launchers = [
1414 1412 MPILauncher,
1415 1413 MPIControllerLauncher,
1416 1414 MPIEngineSetLauncher,
1417 1415 ]
1418 1416 ssh_launchers = [
1419 1417 SSHLauncher,
1420 1418 SSHControllerLauncher,
1421 1419 SSHEngineLauncher,
1422 1420 SSHEngineSetLauncher,
1423 1421 SSHProxyEngineSetLauncher,
1424 1422 ]
1425 1423 winhpc_launchers = [
1426 1424 WindowsHPCLauncher,
1427 1425 WindowsHPCControllerLauncher,
1428 1426 WindowsHPCEngineSetLauncher,
1429 1427 ]
1430 1428 pbs_launchers = [
1431 1429 PBSLauncher,
1432 1430 PBSControllerLauncher,
1433 1431 PBSEngineSetLauncher,
1434 1432 ]
1435 1433 sge_launchers = [
1436 1434 SGELauncher,
1437 1435 SGEControllerLauncher,
1438 1436 SGEEngineSetLauncher,
1439 1437 ]
1440 1438 lsf_launchers = [
1441 1439 LSFLauncher,
1442 1440 LSFControllerLauncher,
1443 1441 LSFEngineSetLauncher,
1444 1442 ]
1445 1443 htcondor_launchers = [
1446 1444 HTCondorLauncher,
1447 1445 HTCondorControllerLauncher,
1448 1446 HTCondorEngineSetLauncher,
1449 1447 ]
1450 1448 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1451 1449 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
General Comments 0
You need to be logged in to leave comments. Login now