##// END OF EJS Templates
use [sys.exe, "-c", "…launch_new_instance()"] in launchers...
MinRK -
Show More
@@ -1,1348 +1,1344
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import re
26 26 import stat
27 import sys
27 28 import time
28 29
29 30 # signal imports, handling various platforms, versions
30 31
31 32 from signal import SIGINT, SIGTERM
32 33 try:
33 34 from signal import SIGKILL
34 35 except ImportError:
35 36 # Windows
36 37 SIGKILL=SIGTERM
37 38
38 39 try:
39 40 # Windows >= 2.7, 3.2
40 41 from signal import CTRL_C_EVENT as SIGINT
41 42 except ImportError:
42 43 pass
43 44
44 45 from subprocess import Popen, PIPE, STDOUT
45 46 try:
46 47 from subprocess import check_output
47 48 except ImportError:
48 49 # pre-2.7, define check_output with Popen
49 50 def check_output(*args, **kwargs):
50 51 kwargs.update(dict(stdout=PIPE))
51 52 p = Popen(*args, **kwargs)
52 53 out,err = p.communicate()
53 54 return out
54 55
55 56 from zmq.eventloop import ioloop
56 57
57 58 from IPython.config.application import Application
58 59 from IPython.config.configurable import LoggingConfigurable
59 60 from IPython.utils.text import EvalFormatter
60 61 from IPython.utils.traitlets import (
61 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 63 )
63 from IPython.utils.path import get_ipython_module_path, get_home_dir
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
64 from IPython.utils.path import get_home_dir
65 from IPython.utils.process import find_cmd, FindCmdError
65 66
66 67 from .win32support import forward_read_events
67 68
68 69 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69 70
70 71 WINDOWS = os.name == 'nt'
71 72
72 73 #-----------------------------------------------------------------------------
73 74 # Paths to the kernel apps
74 75 #-----------------------------------------------------------------------------
75 76
77 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
76 78
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 'IPython.parallel.apps.ipclusterapp'
79 ))
79 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
80 80
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 'IPython.parallel.apps.ipengineapp'
83 ))
81 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
84 82
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 'IPython.parallel.apps.ipcontrollerapp'
87 ))
83 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
88 84
89 85 #-----------------------------------------------------------------------------
90 86 # Base launchers and errors
91 87 #-----------------------------------------------------------------------------
92 88
93 89
94 90 class LauncherError(Exception):
95 91 pass
96 92
97 93
98 94 class ProcessStateError(LauncherError):
99 95 pass
100 96
101 97
102 98 class UnknownStatus(LauncherError):
103 99 pass
104 100
105 101
106 102 class BaseLauncher(LoggingConfigurable):
107 103 """An asbtraction for starting, stopping and signaling a process."""
108 104
109 105 # In all of the launchers, the work_dir is where child processes will be
110 106 # run. This will usually be the profile_dir, but may not be. any work_dir
111 107 # passed into the __init__ method will override the config value.
112 108 # This should not be used to set the work_dir for the actual engine
113 109 # and controller. Instead, use their own config files or the
114 110 # controller_args, engine_args attributes of the launchers to add
115 111 # the work_dir option.
116 112 work_dir = Unicode(u'.')
117 113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118 114
119 115 start_data = Any()
120 116 stop_data = Any()
121 117
122 118 def _loop_default(self):
123 119 return ioloop.IOLoop.instance()
124 120
125 121 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 123 self.state = 'before' # can be before, running, after
128 124 self.stop_callbacks = []
129 125 self.start_data = None
130 126 self.stop_data = None
131 127
132 128 @property
133 129 def args(self):
134 130 """A list of cmd and args that will be used to start the process.
135 131
136 132 This is what is passed to :func:`spawnProcess` and the first element
137 133 will be the process name.
138 134 """
139 135 return self.find_args()
140 136
141 137 def find_args(self):
142 138 """The ``.args`` property calls this to find the args list.
143 139
144 140 Subcommand should implement this to construct the cmd and args.
145 141 """
146 142 raise NotImplementedError('find_args must be implemented in a subclass')
147 143
148 144 @property
149 145 def arg_str(self):
150 146 """The string form of the program arguments."""
151 147 return ' '.join(self.args)
152 148
153 149 @property
154 150 def running(self):
155 151 """Am I running."""
156 152 if self.state == 'running':
157 153 return True
158 154 else:
159 155 return False
160 156
161 157 def start(self):
162 158 """Start the process."""
163 159 raise NotImplementedError('start must be implemented in a subclass')
164 160
165 161 def stop(self):
166 162 """Stop the process and notify observers of stopping.
167 163
168 164 This method will return None immediately.
169 165 To observe the actual process stopping, see :meth:`on_stop`.
170 166 """
171 167 raise NotImplementedError('stop must be implemented in a subclass')
172 168
173 169 def on_stop(self, f):
174 170 """Register a callback to be called with this Launcher's stop_data
175 171 when the process actually finishes.
176 172 """
177 173 if self.state=='after':
178 174 return f(self.stop_data)
179 175 else:
180 176 self.stop_callbacks.append(f)
181 177
182 178 def notify_start(self, data):
183 179 """Call this to trigger startup actions.
184 180
185 181 This logs the process startup and sets the state to 'running'. It is
186 182 a pass-through so it can be used as a callback.
187 183 """
188 184
189 185 self.log.debug('Process %r started: %r', self.args[0], data)
190 186 self.start_data = data
191 187 self.state = 'running'
192 188 return data
193 189
194 190 def notify_stop(self, data):
195 191 """Call this to trigger process stop actions.
196 192
197 193 This logs the process stopping and sets the state to 'after'. Call
198 194 this to trigger callbacks registered via :meth:`on_stop`."""
199 195
200 196 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 197 self.stop_data = data
202 198 self.state = 'after'
203 199 for i in range(len(self.stop_callbacks)):
204 200 d = self.stop_callbacks.pop()
205 201 d(data)
206 202 return data
207 203
208 204 def signal(self, sig):
209 205 """Signal the process.
210 206
211 207 Parameters
212 208 ----------
213 209 sig : str or int
214 210 'KILL', 'INT', etc., or any signal number
215 211 """
216 212 raise NotImplementedError('signal must be implemented in a subclass')
217 213
218 214 class ClusterAppMixin(HasTraits):
219 215 """MixIn for cluster args as traits"""
220 216 profile_dir=Unicode('')
221 217 cluster_id=Unicode('')
222 218
223 219 @property
224 220 def cluster_args(self):
225 221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
226 222
227 223 class ControllerMixin(ClusterAppMixin):
228 224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
229 225 help="""Popen command to launch ipcontroller.""")
230 226 # Command line arguments to ipcontroller.
231 227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
232 228 help="""command-line args to pass to ipcontroller""")
233 229
234 230 class EngineMixin(ClusterAppMixin):
235 231 engine_cmd = List(ipengine_cmd_argv, config=True,
236 232 help="""command to launch the Engine.""")
237 233 # Command line arguments for ipengine.
238 234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
239 235 help="command-line arguments to pass to ipengine"
240 236 )
241 237
242 238
243 239 #-----------------------------------------------------------------------------
244 240 # Local process launchers
245 241 #-----------------------------------------------------------------------------
246 242
247 243
248 244 class LocalProcessLauncher(BaseLauncher):
249 245 """Start and stop an external process in an asynchronous manner.
250 246
251 247 This will launch the external process with a working directory of
252 248 ``self.work_dir``.
253 249 """
254 250
255 251 # This is used to to construct self.args, which is passed to
256 252 # spawnProcess.
257 253 cmd_and_args = List([])
258 254 poll_frequency = Integer(100) # in ms
259 255
260 256 def __init__(self, work_dir=u'.', config=None, **kwargs):
261 257 super(LocalProcessLauncher, self).__init__(
262 258 work_dir=work_dir, config=config, **kwargs
263 259 )
264 260 self.process = None
265 261 self.poller = None
266 262
267 263 def find_args(self):
268 264 return self.cmd_and_args
269 265
270 266 def start(self):
271 267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
272 268 if self.state == 'before':
273 269 self.process = Popen(self.args,
274 270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
275 271 env=os.environ,
276 272 cwd=self.work_dir
277 273 )
278 274 if WINDOWS:
279 275 self.stdout = forward_read_events(self.process.stdout)
280 276 self.stderr = forward_read_events(self.process.stderr)
281 277 else:
282 278 self.stdout = self.process.stdout.fileno()
283 279 self.stderr = self.process.stderr.fileno()
284 280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
285 281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
286 282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
287 283 self.poller.start()
288 284 self.notify_start(self.process.pid)
289 285 else:
290 286 s = 'The process was already started and has state: %r' % self.state
291 287 raise ProcessStateError(s)
292 288
293 289 def stop(self):
294 290 return self.interrupt_then_kill()
295 291
296 292 def signal(self, sig):
297 293 if self.state == 'running':
298 294 if WINDOWS and sig != SIGINT:
299 295 # use Windows tree-kill for better child cleanup
300 296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
301 297 else:
302 298 self.process.send_signal(sig)
303 299
304 300 def interrupt_then_kill(self, delay=2.0):
305 301 """Send INT, wait a delay and then send KILL."""
306 302 try:
307 303 self.signal(SIGINT)
308 304 except Exception:
309 305 self.log.debug("interrupt failed")
310 306 pass
311 307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
312 308 self.killer.start()
313 309
314 310 # callbacks, etc:
315 311
316 312 def handle_stdout(self, fd, events):
317 313 if WINDOWS:
318 314 line = self.stdout.recv()
319 315 else:
320 316 line = self.process.stdout.readline()
321 317 # a stopped process will be readable but return empty strings
322 318 if line:
323 319 self.log.debug(line[:-1])
324 320 else:
325 321 self.poll()
326 322
327 323 def handle_stderr(self, fd, events):
328 324 if WINDOWS:
329 325 line = self.stderr.recv()
330 326 else:
331 327 line = self.process.stderr.readline()
332 328 # a stopped process will be readable but return empty strings
333 329 if line:
334 330 self.log.debug(line[:-1])
335 331 else:
336 332 self.poll()
337 333
338 334 def poll(self):
339 335 status = self.process.poll()
340 336 if status is not None:
341 337 self.poller.stop()
342 338 self.loop.remove_handler(self.stdout)
343 339 self.loop.remove_handler(self.stderr)
344 340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
345 341 return status
346 342
347 343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
348 344 """Launch a controller as a regular external process."""
349 345
350 346 def find_args(self):
351 347 return self.controller_cmd + self.cluster_args + self.controller_args
352 348
353 349 def start(self):
354 350 """Start the controller by profile_dir."""
355 351 return super(LocalControllerLauncher, self).start()
356 352
357 353
358 354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
359 355 """Launch a single engine as a regular externall process."""
360 356
361 357 def find_args(self):
362 358 return self.engine_cmd + self.cluster_args + self.engine_args
363 359
364 360
365 361 class LocalEngineSetLauncher(LocalEngineLauncher):
366 362 """Launch a set of engines as regular external processes."""
367 363
368 364 delay = CFloat(0.1, config=True,
369 365 help="""delay (in seconds) between starting each engine after the first.
370 366 This can help force the engines to get their ids in order, or limit
371 367 process flood when starting many engines."""
372 368 )
373 369
374 370 # launcher class
375 371 launcher_class = LocalEngineLauncher
376 372
377 373 launchers = Dict()
378 374 stop_data = Dict()
379 375
380 376 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 377 super(LocalEngineSetLauncher, self).__init__(
382 378 work_dir=work_dir, config=config, **kwargs
383 379 )
384 380 self.stop_data = {}
385 381
386 382 def start(self, n):
387 383 """Start n engines by profile or profile_dir."""
388 384 dlist = []
389 385 for i in range(n):
390 386 if i > 0:
391 387 time.sleep(self.delay)
392 388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
393 389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
394 390 )
395 391
396 392 # Copy the engine args over to each engine launcher.
397 393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
398 394 el.engine_args = copy.deepcopy(self.engine_args)
399 395 el.on_stop(self._notice_engine_stopped)
400 396 d = el.start()
401 397 self.launchers[i] = el
402 398 dlist.append(d)
403 399 self.notify_start(dlist)
404 400 return dlist
405 401
406 402 def find_args(self):
407 403 return ['engine set']
408 404
409 405 def signal(self, sig):
410 406 dlist = []
411 407 for el in self.launchers.itervalues():
412 408 d = el.signal(sig)
413 409 dlist.append(d)
414 410 return dlist
415 411
416 412 def interrupt_then_kill(self, delay=1.0):
417 413 dlist = []
418 414 for el in self.launchers.itervalues():
419 415 d = el.interrupt_then_kill(delay)
420 416 dlist.append(d)
421 417 return dlist
422 418
423 419 def stop(self):
424 420 return self.interrupt_then_kill()
425 421
426 422 def _notice_engine_stopped(self, data):
427 423 pid = data['pid']
428 424 for idx,el in self.launchers.iteritems():
429 425 if el.process.pid == pid:
430 426 break
431 427 self.launchers.pop(idx)
432 428 self.stop_data[idx] = data
433 429 if not self.launchers:
434 430 self.notify_stop(self.stop_data)
435 431
436 432
437 433 #-----------------------------------------------------------------------------
438 434 # MPI launchers
439 435 #-----------------------------------------------------------------------------
440 436
441 437
442 438 class MPILauncher(LocalProcessLauncher):
443 439 """Launch an external process using mpiexec."""
444 440
445 441 mpi_cmd = List(['mpiexec'], config=True,
446 442 help="The mpiexec command to use in starting the process."
447 443 )
448 444 mpi_args = List([], config=True,
449 445 help="The command line arguments to pass to mpiexec."
450 446 )
451 447 program = List(['date'],
452 448 help="The program to start via mpiexec.")
453 449 program_args = List([],
454 450 help="The command line argument to the program."
455 451 )
456 452 n = Integer(1)
457 453
458 454 def __init__(self, *args, **kwargs):
459 455 # deprecation for old MPIExec names:
460 456 config = kwargs.get('config', {})
461 457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
462 458 deprecated = config.get(oldname)
463 459 if deprecated:
464 460 newname = oldname.replace('MPIExec', 'MPI')
465 461 config[newname].update(deprecated)
466 462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
467 463
468 464 super(MPILauncher, self).__init__(*args, **kwargs)
469 465
470 466 def find_args(self):
471 467 """Build self.args using all the fields."""
472 468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
473 469 self.program + self.program_args
474 470
475 471 def start(self, n):
476 472 """Start n instances of the program using mpiexec."""
477 473 self.n = n
478 474 return super(MPILauncher, self).start()
479 475
480 476
481 477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
482 478 """Launch a controller using mpiexec."""
483 479
484 480 # alias back to *non-configurable* program[_args] for use in find_args()
485 481 # this way all Controller/EngineSetLaunchers have the same form, rather
486 482 # than *some* having `program_args` and others `controller_args`
487 483 @property
488 484 def program(self):
489 485 return self.controller_cmd
490 486
491 487 @property
492 488 def program_args(self):
493 489 return self.cluster_args + self.controller_args
494 490
495 491 def start(self):
496 492 """Start the controller by profile_dir."""
497 493 return super(MPIControllerLauncher, self).start(1)
498 494
499 495
500 496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
501 497 """Launch engines using mpiexec"""
502 498
503 499 # alias back to *non-configurable* program[_args] for use in find_args()
504 500 # this way all Controller/EngineSetLaunchers have the same form, rather
505 501 # than *some* having `program_args` and others `controller_args`
506 502 @property
507 503 def program(self):
508 504 return self.engine_cmd
509 505
510 506 @property
511 507 def program_args(self):
512 508 return self.cluster_args + self.engine_args
513 509
514 510 def start(self, n):
515 511 """Start n engines by profile or profile_dir."""
516 512 self.n = n
517 513 return super(MPIEngineSetLauncher, self).start(n)
518 514
519 515 # deprecated MPIExec names
520 516 class DeprecatedMPILauncher(object):
521 517 def warn(self):
522 518 oldname = self.__class__.__name__
523 519 newname = oldname.replace('MPIExec', 'MPI')
524 520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
525 521
526 522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
527 523 """Deprecated, use MPILauncher"""
528 524 def __init__(self, *args, **kwargs):
529 525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
530 526 self.warn()
531 527
532 528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
533 529 """Deprecated, use MPIControllerLauncher"""
534 530 def __init__(self, *args, **kwargs):
535 531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
536 532 self.warn()
537 533
538 534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
539 535 """Deprecated, use MPIEngineSetLauncher"""
540 536 def __init__(self, *args, **kwargs):
541 537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
542 538 self.warn()
543 539
544 540
545 541 #-----------------------------------------------------------------------------
546 542 # SSH launchers
547 543 #-----------------------------------------------------------------------------
548 544
549 545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
550 546
551 547 class SSHLauncher(LocalProcessLauncher):
552 548 """A minimal launcher for ssh.
553 549
554 550 To be useful this will probably have to be extended to use the ``sshx``
555 551 idea for environment variables. There could be other things this needs
556 552 as well.
557 553 """
558 554
559 555 ssh_cmd = List(['ssh'], config=True,
560 556 help="command for starting ssh")
561 557 ssh_args = List(['-tt'], config=True,
562 558 help="args to pass to ssh")
563 559 scp_cmd = List(['scp'], config=True,
564 560 help="command for sending files")
565 561 program = List(['date'],
566 562 help="Program to launch via ssh")
567 563 program_args = List([],
568 564 help="args to pass to remote program")
569 565 hostname = Unicode('', config=True,
570 566 help="hostname on which to launch the program")
571 567 user = Unicode('', config=True,
572 568 help="username for ssh")
573 569 location = Unicode('', config=True,
574 570 help="user@hostname location for ssh in one setting")
575 571 to_fetch = List([], config=True,
576 572 help="List of (remote, local) files to fetch after starting")
577 573 to_send = List([], config=True,
578 574 help="List of (local, remote) files to send before starting")
579 575
580 576 def _hostname_changed(self, name, old, new):
581 577 if self.user:
582 578 self.location = u'%s@%s' % (self.user, new)
583 579 else:
584 580 self.location = new
585 581
586 582 def _user_changed(self, name, old, new):
587 583 self.location = u'%s@%s' % (new, self.hostname)
588 584
589 585 def find_args(self):
590 586 return self.ssh_cmd + self.ssh_args + [self.location] + \
591 587 self.program + self.program_args
592 588
593 589 def _send_file(self, local, remote):
594 590 """send a single file"""
595 591 remote = "%s:%s" % (self.location, remote)
596 592 for i in range(10):
597 593 if not os.path.exists(local):
598 594 self.log.debug("waiting for %s" % local)
599 595 time.sleep(1)
600 596 else:
601 597 break
602 598 self.log.info("sending %s to %s", local, remote)
603 599 check_output(self.scp_cmd + [local, remote])
604 600
605 601 def send_files(self):
606 602 """send our files (called before start)"""
607 603 if not self.to_send:
608 604 return
609 605 for local_file, remote_file in self.to_send:
610 606 self._send_file(local_file, remote_file)
611 607
612 608 def _fetch_file(self, remote, local):
613 609 """fetch a single file"""
614 610 full_remote = "%s:%s" % (self.location, remote)
615 611 self.log.info("fetching %s from %s", local, full_remote)
616 612 for i in range(10):
617 613 # wait up to 10s for remote file to exist
618 614 check = check_output(self.ssh_cmd + self.ssh_args + \
619 615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
620 616 check = check.strip()
621 617 if check == 'no':
622 618 time.sleep(1)
623 619 elif check == 'yes':
624 620 break
625 621 check_output(self.scp_cmd + [full_remote, local])
626 622
627 623 def fetch_files(self):
628 624 """fetch remote files (called after start)"""
629 625 if not self.to_fetch:
630 626 return
631 627 for remote_file, local_file in self.to_fetch:
632 628 self._fetch_file(remote_file, local_file)
633 629
634 630 def start(self, hostname=None, user=None):
635 631 if hostname is not None:
636 632 self.hostname = hostname
637 633 if user is not None:
638 634 self.user = user
639 635
640 636 self.send_files()
641 637 super(SSHLauncher, self).start()
642 638 self.fetch_files()
643 639
644 640 def signal(self, sig):
645 641 if self.state == 'running':
646 642 # send escaped ssh connection-closer
647 643 self.process.stdin.write('~.')
648 644 self.process.stdin.flush()
649 645
650 646 class SSHClusterLauncher(SSHLauncher):
651 647
652 648 remote_profile_dir = Unicode('', config=True,
653 649 help="""The remote profile_dir to use.
654 650
655 651 If not specified, use calling profile, stripping out possible leading homedir.
656 652 """)
657 653
658 654 def _remote_profie_dir_default(self):
659 655 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
660 656 """
661 657 home = get_home_dir()
662 658 if not home.endswith('/'):
663 659 home = home+'/'
664 660
665 661 if self.profile_dir.startswith(home):
666 662 return self.profile_dir[len(home):]
667 663 else:
668 664 return self.profile_dir
669 665
670 666 def _cluster_id_changed(self, name, old, new):
671 667 if new:
672 668 raise ValueError("cluster id not supported by SSH launchers")
673 669
674 670 @property
675 671 def cluster_args(self):
676 672 return ['--profile-dir', self.remote_profile_dir]
677 673
678 674 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
679 675
680 676 # alias back to *non-configurable* program[_args] for use in find_args()
681 677 # this way all Controller/EngineSetLaunchers have the same form, rather
682 678 # than *some* having `program_args` and others `controller_args`
683 679
684 680 def _controller_cmd_default(self):
685 681 return ['ipcontroller']
686 682
687 683 @property
688 684 def program(self):
689 685 return self.controller_cmd
690 686
691 687 @property
692 688 def program_args(self):
693 689 return self.cluster_args + self.controller_args
694 690
695 691 def _to_fetch_default(self):
696 692 return [
697 693 (os.path.join(self.remote_profile_dir, 'security', cf),
698 694 os.path.join(self.profile_dir, 'security', cf),)
699 695 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
700 696 ]
701 697
702 698 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
703 699
704 700 # alias back to *non-configurable* program[_args] for use in find_args()
705 701 # this way all Controller/EngineSetLaunchers have the same form, rather
706 702 # than *some* having `program_args` and others `controller_args`
707 703
708 704 def _engine_cmd_default(self):
709 705 return ['ipengine']
710 706
711 707 @property
712 708 def program(self):
713 709 return self.engine_cmd
714 710
715 711 @property
716 712 def program_args(self):
717 713 return self.cluster_args + self.engine_args
718 714
719 715 def _to_send_default(self):
720 716 return [
721 717 (os.path.join(self.profile_dir, 'security', cf),
722 718 os.path.join(self.remote_profile_dir, 'security', cf))
723 719 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
724 720 ]
725 721
726 722
727 723 class SSHEngineSetLauncher(LocalEngineSetLauncher):
728 724 launcher_class = SSHEngineLauncher
729 725 engines = Dict(config=True,
730 726 help="""dict of engines to launch. This is a dict by hostname of ints,
731 727 corresponding to the number of engines to start on that host.""")
732 728
733 729 @property
734 730 def engine_count(self):
735 731 """determine engine count from `engines` dict"""
736 732 count = 0
737 733 for n in self.engines.itervalues():
738 734 if isinstance(n, (tuple,list)):
739 735 n,args = n
740 736 count += n
741 737 return count
742 738
743 739 def start(self, n):
744 740 """Start engines by profile or profile_dir.
745 741 `n` is ignored, and the `engines` config property is used instead.
746 742 """
747 743
748 744 dlist = []
749 745 for host, n in self.engines.iteritems():
750 746 if isinstance(n, (tuple, list)):
751 747 n, args = n
752 748 else:
753 749 args = copy.deepcopy(self.engine_args)
754 750
755 751 if '@' in host:
756 752 user,host = host.split('@',1)
757 753 else:
758 754 user=None
759 755 for i in range(n):
760 756 if i > 0:
761 757 time.sleep(self.delay)
762 758 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
763 759 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
764 760 )
765 761 if i > 0:
766 762 # only send files for the first engine on each host
767 763 el.to_send = []
768 764
769 765 # Copy the engine args over to each engine launcher.
770 766 el.engine_cmd = self.engine_cmd
771 767 el.engine_args = args
772 768 el.on_stop(self._notice_engine_stopped)
773 769 d = el.start(user=user, hostname=host)
774 770 self.launchers[ "%s/%i" % (host,i) ] = el
775 771 dlist.append(d)
776 772 self.notify_start(dlist)
777 773 return dlist
778 774
779 775
780 776 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
781 777 """Launcher for calling
782 778 `ipcluster engines` on a remote machine.
783 779
784 780 Requires that remote profile is already configured.
785 781 """
786 782
787 783 n = Integer()
788 784 ipcluster_cmd = List(['ipcluster'], config=True)
789 785
790 786 @property
791 787 def program(self):
792 788 return self.ipcluster_cmd + ['engines']
793 789
794 790 @property
795 791 def program_args(self):
796 792 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
797 793
798 794 def _to_send_default(self):
799 795 return [
800 796 (os.path.join(self.profile_dir, 'security', cf),
801 797 os.path.join(self.remote_profile_dir, 'security', cf))
802 798 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
803 799 ]
804 800
805 801 def start(self, n):
806 802 self.n = n
807 803 super(SSHProxyEngineSetLauncher, self).start()
808 804
809 805
810 806 #-----------------------------------------------------------------------------
811 807 # Windows HPC Server 2008 scheduler launchers
812 808 #-----------------------------------------------------------------------------
813 809
814 810
815 811 # This is only used on Windows.
816 812 def find_job_cmd():
817 813 if WINDOWS:
818 814 try:
819 815 return find_cmd('job')
820 816 except (FindCmdError, ImportError):
821 817 # ImportError will be raised if win32api is not installed
822 818 return 'job'
823 819 else:
824 820 return 'job'
825 821
826 822
827 823 class WindowsHPCLauncher(BaseLauncher):
828 824
829 825 job_id_regexp = Unicode(r'\d+', config=True,
830 826 help="""A regular expression used to get the job id from the output of the
831 827 submit_command. """
832 828 )
833 829 job_file_name = Unicode(u'ipython_job.xml', config=True,
834 830 help="The filename of the instantiated job script.")
835 831 # The full path to the instantiated job script. This gets made dynamically
836 832 # by combining the work_dir with the job_file_name.
837 833 job_file = Unicode(u'')
838 834 scheduler = Unicode('', config=True,
839 835 help="The hostname of the scheduler to submit the job to.")
840 836 job_cmd = Unicode(find_job_cmd(), config=True,
841 837 help="The command for submitting jobs.")
842 838
843 839 def __init__(self, work_dir=u'.', config=None, **kwargs):
844 840 super(WindowsHPCLauncher, self).__init__(
845 841 work_dir=work_dir, config=config, **kwargs
846 842 )
847 843
848 844 @property
849 845 def job_file(self):
850 846 return os.path.join(self.work_dir, self.job_file_name)
851 847
852 848 def write_job_file(self, n):
853 849 raise NotImplementedError("Implement write_job_file in a subclass.")
854 850
855 851 def find_args(self):
856 852 return [u'job.exe']
857 853
858 854 def parse_job_id(self, output):
859 855 """Take the output of the submit command and return the job id."""
860 856 m = re.search(self.job_id_regexp, output)
861 857 if m is not None:
862 858 job_id = m.group()
863 859 else:
864 860 raise LauncherError("Job id couldn't be determined: %s" % output)
865 861 self.job_id = job_id
866 862 self.log.info('Job started with id: %r', job_id)
867 863 return job_id
868 864
869 865 def start(self, n):
870 866 """Start n copies of the process using the Win HPC job scheduler."""
871 867 self.write_job_file(n)
872 868 args = [
873 869 'submit',
874 870 '/jobfile:%s' % self.job_file,
875 871 '/scheduler:%s' % self.scheduler
876 872 ]
877 873 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
878 874
879 875 output = check_output([self.job_cmd]+args,
880 876 env=os.environ,
881 877 cwd=self.work_dir,
882 878 stderr=STDOUT
883 879 )
884 880 job_id = self.parse_job_id(output)
885 881 self.notify_start(job_id)
886 882 return job_id
887 883
888 884 def stop(self):
889 885 args = [
890 886 'cancel',
891 887 self.job_id,
892 888 '/scheduler:%s' % self.scheduler
893 889 ]
894 890 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
895 891 try:
896 892 output = check_output([self.job_cmd]+args,
897 893 env=os.environ,
898 894 cwd=self.work_dir,
899 895 stderr=STDOUT
900 896 )
901 897 except:
902 898 output = 'The job already appears to be stoppped: %r' % self.job_id
903 899 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
904 900 return output
905 901
906 902
907 903 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
908 904
909 905 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
910 906 help="WinHPC xml job file.")
911 907 controller_args = List([], config=False,
912 908 help="extra args to pass to ipcontroller")
913 909
914 910 def write_job_file(self, n):
915 911 job = IPControllerJob(config=self.config)
916 912
917 913 t = IPControllerTask(config=self.config)
918 914 # The tasks work directory is *not* the actual work directory of
919 915 # the controller. It is used as the base path for the stdout/stderr
920 916 # files that the scheduler redirects to.
921 917 t.work_directory = self.profile_dir
922 918 # Add the profile_dir and from self.start().
923 919 t.controller_args.extend(self.cluster_args)
924 920 t.controller_args.extend(self.controller_args)
925 921 job.add_task(t)
926 922
927 923 self.log.debug("Writing job description file: %s", self.job_file)
928 924 job.write(self.job_file)
929 925
930 926 @property
931 927 def job_file(self):
932 928 return os.path.join(self.profile_dir, self.job_file_name)
933 929
934 930 def start(self):
935 931 """Start the controller by profile_dir."""
936 932 return super(WindowsHPCControllerLauncher, self).start(1)
937 933
938 934
939 935 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
940 936
941 937 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
942 938 help="jobfile for ipengines job")
943 939 engine_args = List([], config=False,
944 940 help="extra args to pas to ipengine")
945 941
946 942 def write_job_file(self, n):
947 943 job = IPEngineSetJob(config=self.config)
948 944
949 945 for i in range(n):
950 946 t = IPEngineTask(config=self.config)
951 947 # The tasks work directory is *not* the actual work directory of
952 948 # the engine. It is used as the base path for the stdout/stderr
953 949 # files that the scheduler redirects to.
954 950 t.work_directory = self.profile_dir
955 951 # Add the profile_dir and from self.start().
956 952 t.engine_args.extend(self.cluster_args)
957 953 t.engine_args.extend(self.engine_args)
958 954 job.add_task(t)
959 955
960 956 self.log.debug("Writing job description file: %s", self.job_file)
961 957 job.write(self.job_file)
962 958
963 959 @property
964 960 def job_file(self):
965 961 return os.path.join(self.profile_dir, self.job_file_name)
966 962
967 963 def start(self, n):
968 964 """Start the controller by profile_dir."""
969 965 return super(WindowsHPCEngineSetLauncher, self).start(n)
970 966
971 967
972 968 #-----------------------------------------------------------------------------
973 969 # Batch (PBS) system launchers
974 970 #-----------------------------------------------------------------------------
975 971
976 972 class BatchClusterAppMixin(ClusterAppMixin):
977 973 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
978 974 def _profile_dir_changed(self, name, old, new):
979 975 self.context[name] = new
980 976 _cluster_id_changed = _profile_dir_changed
981 977
982 978 def _profile_dir_default(self):
983 979 self.context['profile_dir'] = ''
984 980 return ''
985 981 def _cluster_id_default(self):
986 982 self.context['cluster_id'] = ''
987 983 return ''
988 984
989 985
990 986 class BatchSystemLauncher(BaseLauncher):
991 987 """Launch an external process using a batch system.
992 988
993 989 This class is designed to work with UNIX batch systems like PBS, LSF,
994 990 GridEngine, etc. The overall model is that there are different commands
995 991 like qsub, qdel, etc. that handle the starting and stopping of the process.
996 992
997 993 This class also has the notion of a batch script. The ``batch_template``
998 994 attribute can be set to a string that is a template for the batch script.
999 995 This template is instantiated using string formatting. Thus the template can
1000 996 use {n} fot the number of instances. Subclasses can add additional variables
1001 997 to the template dict.
1002 998 """
1003 999
1004 1000 # Subclasses must fill these in. See PBSEngineSet
1005 1001 submit_command = List([''], config=True,
1006 1002 help="The name of the command line program used to submit jobs.")
1007 1003 delete_command = List([''], config=True,
1008 1004 help="The name of the command line program used to delete jobs.")
1009 1005 job_id_regexp = Unicode('', config=True,
1010 1006 help="""A regular expression used to get the job id from the output of the
1011 1007 submit_command.""")
1012 1008 batch_template = Unicode('', config=True,
1013 1009 help="The string that is the batch script template itself.")
1014 1010 batch_template_file = Unicode(u'', config=True,
1015 1011 help="The file that contains the batch template.")
1016 1012 batch_file_name = Unicode(u'batch_script', config=True,
1017 1013 help="The filename of the instantiated batch script.")
1018 1014 queue = Unicode(u'', config=True,
1019 1015 help="The PBS Queue.")
1020 1016
1021 1017 def _queue_changed(self, name, old, new):
1022 1018 self.context[name] = new
1023 1019
1024 1020 n = Integer(1)
1025 1021 _n_changed = _queue_changed
1026 1022
1027 1023 # not configurable, override in subclasses
1028 1024 # PBS Job Array regex
1029 1025 job_array_regexp = Unicode('')
1030 1026 job_array_template = Unicode('')
1031 1027 # PBS Queue regex
1032 1028 queue_regexp = Unicode('')
1033 1029 queue_template = Unicode('')
1034 1030 # The default batch template, override in subclasses
1035 1031 default_template = Unicode('')
1036 1032 # The full path to the instantiated batch script.
1037 1033 batch_file = Unicode(u'')
1038 1034 # the format dict used with batch_template:
1039 1035 context = Dict()
1040 1036 def _context_default(self):
1041 1037 """load the default context with the default values for the basic keys
1042 1038
1043 1039 because the _trait_changed methods only load the context if they
1044 1040 are set to something other than the default value.
1045 1041 """
1046 1042 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1047 1043
1048 1044 # the Formatter instance for rendering the templates:
1049 1045 formatter = Instance(EvalFormatter, (), {})
1050 1046
1051 1047
1052 1048 def find_args(self):
1053 1049 return self.submit_command + [self.batch_file]
1054 1050
1055 1051 def __init__(self, work_dir=u'.', config=None, **kwargs):
1056 1052 super(BatchSystemLauncher, self).__init__(
1057 1053 work_dir=work_dir, config=config, **kwargs
1058 1054 )
1059 1055 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1060 1056
1061 1057 def parse_job_id(self, output):
1062 1058 """Take the output of the submit command and return the job id."""
1063 1059 m = re.search(self.job_id_regexp, output)
1064 1060 if m is not None:
1065 1061 job_id = m.group()
1066 1062 else:
1067 1063 raise LauncherError("Job id couldn't be determined: %s" % output)
1068 1064 self.job_id = job_id
1069 1065 self.log.info('Job submitted with job id: %r', job_id)
1070 1066 return job_id
1071 1067
1072 1068 def write_batch_script(self, n):
1073 1069 """Instantiate and write the batch script to the work_dir."""
1074 1070 self.n = n
1075 1071 # first priority is batch_template if set
1076 1072 if self.batch_template_file and not self.batch_template:
1077 1073 # second priority is batch_template_file
1078 1074 with open(self.batch_template_file) as f:
1079 1075 self.batch_template = f.read()
1080 1076 if not self.batch_template:
1081 1077 # third (last) priority is default_template
1082 1078 self.batch_template = self.default_template
1083 1079
1084 1080 # add jobarray or queue lines to user-specified template
1085 1081 # note that this is *only* when user did not specify a template.
1086 1082 regex = re.compile(self.job_array_regexp)
1087 1083 # print regex.search(self.batch_template)
1088 1084 if not regex.search(self.batch_template):
1089 1085 self.log.debug("adding job array settings to batch script")
1090 1086 firstline, rest = self.batch_template.split('\n',1)
1091 1087 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1092 1088
1093 1089 regex = re.compile(self.queue_regexp)
1094 1090 # print regex.search(self.batch_template)
1095 1091 if self.queue and not regex.search(self.batch_template):
1096 1092 self.log.debug("adding PBS queue settings to batch script")
1097 1093 firstline, rest = self.batch_template.split('\n',1)
1098 1094 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1099 1095
1100 1096 script_as_string = self.formatter.format(self.batch_template, **self.context)
1101 1097 self.log.debug('Writing batch script: %s', self.batch_file)
1102 1098
1103 1099 with open(self.batch_file, 'w') as f:
1104 1100 f.write(script_as_string)
1105 1101 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1106 1102
1107 1103 def start(self, n):
1108 1104 """Start n copies of the process using a batch system."""
1109 1105 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1110 1106 # Here we save profile_dir in the context so they
1111 1107 # can be used in the batch script template as {profile_dir}
1112 1108 self.write_batch_script(n)
1113 1109 output = check_output(self.args, env=os.environ)
1114 1110
1115 1111 job_id = self.parse_job_id(output)
1116 1112 self.notify_start(job_id)
1117 1113 return job_id
1118 1114
1119 1115 def stop(self):
1120 1116 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1121 1117 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1122 1118 return output
1123 1119
1124 1120
1125 1121 class PBSLauncher(BatchSystemLauncher):
1126 1122 """A BatchSystemLauncher subclass for PBS."""
1127 1123
1128 1124 submit_command = List(['qsub'], config=True,
1129 1125 help="The PBS submit command ['qsub']")
1130 1126 delete_command = List(['qdel'], config=True,
1131 1127 help="The PBS delete command ['qsub']")
1132 1128 job_id_regexp = Unicode(r'\d+', config=True,
1133 1129 help="Regular expresion for identifying the job ID [r'\d+']")
1134 1130
1135 1131 batch_file = Unicode(u'')
1136 1132 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
1137 1133 job_array_template = Unicode('#PBS -t 1-{n}')
1138 1134 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
1139 1135 queue_template = Unicode('#PBS -q {queue}')
1140 1136
1141 1137
1142 1138 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1143 1139 """Launch a controller using PBS."""
1144 1140
1145 1141 batch_file_name = Unicode(u'pbs_controller', config=True,
1146 1142 help="batch file name for the controller job.")
1147 1143 default_template= Unicode("""#!/bin/sh
1148 1144 #PBS -V
1149 1145 #PBS -N ipcontroller
1150 1146 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1151 1147 """%(' '.join(ipcontroller_cmd_argv)))
1152 1148
1153 1149
1154 1150 def start(self):
1155 1151 """Start the controller by profile or profile_dir."""
1156 1152 return super(PBSControllerLauncher, self).start(1)
1157 1153
1158 1154
1159 1155 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1160 1156 """Launch Engines using PBS"""
1161 1157 batch_file_name = Unicode(u'pbs_engines', config=True,
1162 1158 help="batch file name for the engine(s) job.")
1163 1159 default_template= Unicode(u"""#!/bin/sh
1164 1160 #PBS -V
1165 1161 #PBS -N ipengine
1166 1162 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1167 1163 """%(' '.join(ipengine_cmd_argv)))
1168 1164
1169 1165 def start(self, n):
1170 1166 """Start n engines by profile or profile_dir."""
1171 1167 return super(PBSEngineSetLauncher, self).start(n)
1172 1168
1173 1169 #SGE is very similar to PBS
1174 1170
1175 1171 class SGELauncher(PBSLauncher):
1176 1172 """Sun GridEngine is a PBS clone with slightly different syntax"""
1177 1173 job_array_regexp = Unicode('#\$\W+\-t')
1178 1174 job_array_template = Unicode('#$ -t 1-{n}')
1179 1175 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1180 1176 queue_template = Unicode('#$ -q {queue}')
1181 1177
1182 1178 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1183 1179 """Launch a controller using SGE."""
1184 1180
1185 1181 batch_file_name = Unicode(u'sge_controller', config=True,
1186 1182 help="batch file name for the ipontroller job.")
1187 1183 default_template= Unicode(u"""#$ -V
1188 1184 #$ -S /bin/sh
1189 1185 #$ -N ipcontroller
1190 1186 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1191 1187 """%(' '.join(ipcontroller_cmd_argv)))
1192 1188
1193 1189 def start(self):
1194 1190 """Start the controller by profile or profile_dir."""
1195 1191 return super(SGEControllerLauncher, self).start(1)
1196 1192
1197 1193 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1198 1194 """Launch Engines with SGE"""
1199 1195 batch_file_name = Unicode(u'sge_engines', config=True,
1200 1196 help="batch file name for the engine(s) job.")
1201 1197 default_template = Unicode("""#$ -V
1202 1198 #$ -S /bin/sh
1203 1199 #$ -N ipengine
1204 1200 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1205 1201 """%(' '.join(ipengine_cmd_argv)))
1206 1202
1207 1203 def start(self, n):
1208 1204 """Start n engines by profile or profile_dir."""
1209 1205 return super(SGEEngineSetLauncher, self).start(n)
1210 1206
1211 1207
1212 1208 # LSF launchers
1213 1209
1214 1210 class LSFLauncher(BatchSystemLauncher):
1215 1211 """A BatchSystemLauncher subclass for LSF."""
1216 1212
1217 1213 submit_command = List(['bsub'], config=True,
1218 1214 help="The PBS submit command ['bsub']")
1219 1215 delete_command = List(['bkill'], config=True,
1220 1216 help="The PBS delete command ['bkill']")
1221 1217 job_id_regexp = Unicode(r'\d+', config=True,
1222 1218 help="Regular expresion for identifying the job ID [r'\d+']")
1223 1219
1224 1220 batch_file = Unicode(u'')
1225 1221 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1226 1222 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1227 1223 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1228 1224 queue_template = Unicode('#BSUB -q {queue}')
1229 1225
1230 1226 def start(self, n):
1231 1227 """Start n copies of the process using LSF batch system.
1232 1228 This cant inherit from the base class because bsub expects
1233 1229 to be piped a shell script in order to honor the #BSUB directives :
1234 1230 bsub < script
1235 1231 """
1236 1232 # Here we save profile_dir in the context so they
1237 1233 # can be used in the batch script template as {profile_dir}
1238 1234 self.write_batch_script(n)
1239 1235 #output = check_output(self.args, env=os.environ)
1240 1236 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1241 1237 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1242 1238 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1243 1239 output,err = p.communicate()
1244 1240 job_id = self.parse_job_id(output)
1245 1241 self.notify_start(job_id)
1246 1242 return job_id
1247 1243
1248 1244
1249 1245 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1250 1246 """Launch a controller using LSF."""
1251 1247
1252 1248 batch_file_name = Unicode(u'lsf_controller', config=True,
1253 1249 help="batch file name for the controller job.")
1254 1250 default_template= Unicode("""#!/bin/sh
1255 1251 #BSUB -J ipcontroller
1256 1252 #BSUB -oo ipcontroller.o.%%J
1257 1253 #BSUB -eo ipcontroller.e.%%J
1258 1254 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1259 1255 """%(' '.join(ipcontroller_cmd_argv)))
1260 1256
1261 1257 def start(self):
1262 1258 """Start the controller by profile or profile_dir."""
1263 1259 return super(LSFControllerLauncher, self).start(1)
1264 1260
1265 1261
1266 1262 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1267 1263 """Launch Engines using LSF"""
1268 1264 batch_file_name = Unicode(u'lsf_engines', config=True,
1269 1265 help="batch file name for the engine(s) job.")
1270 1266 default_template= Unicode(u"""#!/bin/sh
1271 1267 #BSUB -oo ipengine.o.%%J
1272 1268 #BSUB -eo ipengine.e.%%J
1273 1269 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1274 1270 """%(' '.join(ipengine_cmd_argv)))
1275 1271
1276 1272 def start(self, n):
1277 1273 """Start n engines by profile or profile_dir."""
1278 1274 return super(LSFEngineSetLauncher, self).start(n)
1279 1275
1280 1276
1281 1277 #-----------------------------------------------------------------------------
1282 1278 # A launcher for ipcluster itself!
1283 1279 #-----------------------------------------------------------------------------
1284 1280
1285 1281
1286 1282 class IPClusterLauncher(LocalProcessLauncher):
1287 1283 """Launch the ipcluster program in an external process."""
1288 1284
1289 1285 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1290 1286 help="Popen command for ipcluster")
1291 1287 ipcluster_args = List(
1292 1288 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1293 1289 help="Command line arguments to pass to ipcluster.")
1294 1290 ipcluster_subcommand = Unicode('start')
1295 1291 profile = Unicode('default')
1296 1292 n = Integer(2)
1297 1293
1298 1294 def find_args(self):
1299 1295 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1300 1296 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1301 1297 self.ipcluster_args
1302 1298
1303 1299 def start(self):
1304 1300 return super(IPClusterLauncher, self).start()
1305 1301
1306 1302 #-----------------------------------------------------------------------------
1307 1303 # Collections of launchers
1308 1304 #-----------------------------------------------------------------------------
1309 1305
1310 1306 local_launchers = [
1311 1307 LocalControllerLauncher,
1312 1308 LocalEngineLauncher,
1313 1309 LocalEngineSetLauncher,
1314 1310 ]
1315 1311 mpi_launchers = [
1316 1312 MPILauncher,
1317 1313 MPIControllerLauncher,
1318 1314 MPIEngineSetLauncher,
1319 1315 ]
1320 1316 ssh_launchers = [
1321 1317 SSHLauncher,
1322 1318 SSHControllerLauncher,
1323 1319 SSHEngineLauncher,
1324 1320 SSHEngineSetLauncher,
1325 1321 ]
1326 1322 winhpc_launchers = [
1327 1323 WindowsHPCLauncher,
1328 1324 WindowsHPCControllerLauncher,
1329 1325 WindowsHPCEngineSetLauncher,
1330 1326 ]
1331 1327 pbs_launchers = [
1332 1328 PBSLauncher,
1333 1329 PBSControllerLauncher,
1334 1330 PBSEngineSetLauncher,
1335 1331 ]
1336 1332 sge_launchers = [
1337 1333 SGELauncher,
1338 1334 SGEControllerLauncher,
1339 1335 SGEEngineSetLauncher,
1340 1336 ]
1341 1337 lsf_launchers = [
1342 1338 LSFLauncher,
1343 1339 LSFControllerLauncher,
1344 1340 LSFEngineSetLauncher,
1345 1341 ]
1346 1342 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1347 1343 + pbs_launchers + sge_launchers + lsf_launchers
1348 1344
General Comments 0
You need to be logged in to leave comments. Login now