##// END OF EJS Templates
handle potentially absent win32api in launcher.py
MinRK -
Show More
@@ -1,971 +1,972
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import os
21 21 import re
22 22 import stat
23 23
24 24 from signal import SIGINT, SIGTERM
25 25 try:
26 26 from signal import SIGKILL
27 27 except ImportError:
28 28 SIGKILL=SIGTERM
29 29
30 30 from subprocess import Popen, PIPE, STDOUT
31 31 try:
32 32 from subprocess import check_output
33 33 except ImportError:
34 34 # pre-2.7, define check_output with Popen
35 35 def check_output(*args, **kwargs):
36 36 kwargs.update(dict(stdout=PIPE))
37 37 p = Popen(*args, **kwargs)
38 38 out,err = p.communicate()
39 39 return out
40 40
41 41 from zmq.eventloop import ioloop
42 42
43 43 from IPython.external import Itpl
44 44 # from IPython.config.configurable import Configurable
45 45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
46 46 from IPython.utils.path import get_ipython_module_path
47 47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
48 48
49 49 from IPython.parallel.factory import LoggingFactory
50 50
51 51 # load winhpcjob only on Windows
52 52 try:
53 53 from .winhpcjob import (
54 54 IPControllerTask, IPEngineTask,
55 55 IPControllerJob, IPEngineSetJob
56 56 )
57 57 except ImportError:
58 58 pass
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # Paths to the kernel apps
63 63 #-----------------------------------------------------------------------------
64 64
65 65
66 66 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
67 67 'IPython.parallel.apps.ipclusterapp'
68 68 ))
69 69
70 70 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
71 71 'IPython.parallel.apps.ipengineapp'
72 72 ))
73 73
74 74 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
75 75 'IPython.parallel.apps.ipcontrollerapp'
76 76 ))
77 77
78 78 #-----------------------------------------------------------------------------
79 79 # Base launchers and errors
80 80 #-----------------------------------------------------------------------------
81 81
82 82
83 83 class LauncherError(Exception):
84 84 pass
85 85
86 86
87 87 class ProcessStateError(LauncherError):
88 88 pass
89 89
90 90
91 91 class UnknownStatus(LauncherError):
92 92 pass
93 93
94 94
95 95 class BaseLauncher(LoggingFactory):
96 96 """An asbtraction for starting, stopping and signaling a process."""
97 97
98 98 # In all of the launchers, the work_dir is where child processes will be
99 99 # run. This will usually be the cluster_dir, but may not be. any work_dir
100 100 # passed into the __init__ method will override the config value.
101 101 # This should not be used to set the work_dir for the actual engine
102 102 # and controller. Instead, use their own config files or the
103 103 # controller_args, engine_args attributes of the launchers to add
104 104 # the --work-dir option.
105 105 work_dir = Unicode(u'.')
106 106 loop = Instance('zmq.eventloop.ioloop.IOLoop')
107 107
108 108 start_data = Any()
109 109 stop_data = Any()
110 110
111 111 def _loop_default(self):
112 112 return ioloop.IOLoop.instance()
113 113
114 114 def __init__(self, work_dir=u'.', config=None, **kwargs):
115 115 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
116 116 self.state = 'before' # can be before, running, after
117 117 self.stop_callbacks = []
118 118 self.start_data = None
119 119 self.stop_data = None
120 120
121 121 @property
122 122 def args(self):
123 123 """A list of cmd and args that will be used to start the process.
124 124
125 125 This is what is passed to :func:`spawnProcess` and the first element
126 126 will be the process name.
127 127 """
128 128 return self.find_args()
129 129
130 130 def find_args(self):
131 131 """The ``.args`` property calls this to find the args list.
132 132
133 133 Subcommand should implement this to construct the cmd and args.
134 134 """
135 135 raise NotImplementedError('find_args must be implemented in a subclass')
136 136
137 137 @property
138 138 def arg_str(self):
139 139 """The string form of the program arguments."""
140 140 return ' '.join(self.args)
141 141
142 142 @property
143 143 def running(self):
144 144 """Am I running."""
145 145 if self.state == 'running':
146 146 return True
147 147 else:
148 148 return False
149 149
150 150 def start(self):
151 151 """Start the process.
152 152
153 153 This must return a deferred that fires with information about the
154 154 process starting (like a pid, job id, etc.).
155 155 """
156 156 raise NotImplementedError('start must be implemented in a subclass')
157 157
158 158 def stop(self):
159 159 """Stop the process and notify observers of stopping.
160 160
161 161 This must return a deferred that fires with information about the
162 162 processing stopping, like errors that occur while the process is
163 163 attempting to be shut down. This deferred won't fire when the process
164 164 actually stops. To observe the actual process stopping, see
165 165 :func:`observe_stop`.
166 166 """
167 167 raise NotImplementedError('stop must be implemented in a subclass')
168 168
169 169 def on_stop(self, f):
170 170 """Get a deferred that will fire when the process stops.
171 171
172 172 The deferred will fire with data that contains information about
173 173 the exit status of the process.
174 174 """
175 175 if self.state=='after':
176 176 return f(self.stop_data)
177 177 else:
178 178 self.stop_callbacks.append(f)
179 179
180 180 def notify_start(self, data):
181 181 """Call this to trigger startup actions.
182 182
183 183 This logs the process startup and sets the state to 'running'. It is
184 184 a pass-through so it can be used as a callback.
185 185 """
186 186
187 187 self.log.info('Process %r started: %r' % (self.args[0], data))
188 188 self.start_data = data
189 189 self.state = 'running'
190 190 return data
191 191
192 192 def notify_stop(self, data):
193 193 """Call this to trigger process stop actions.
194 194
195 195 This logs the process stopping and sets the state to 'after'. Call
196 196 this to trigger all the deferreds from :func:`observe_stop`."""
197 197
198 198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
199 199 self.stop_data = data
200 200 self.state = 'after'
201 201 for i in range(len(self.stop_callbacks)):
202 202 d = self.stop_callbacks.pop()
203 203 d(data)
204 204 return data
205 205
206 206 def signal(self, sig):
207 207 """Signal the process.
208 208
209 209 Return a semi-meaningless deferred after signaling the process.
210 210
211 211 Parameters
212 212 ----------
213 213 sig : str or int
214 214 'KILL', 'INT', etc., or any signal number
215 215 """
216 216 raise NotImplementedError('signal must be implemented in a subclass')
217 217
218 218
219 219 #-----------------------------------------------------------------------------
220 220 # Local process launchers
221 221 #-----------------------------------------------------------------------------
222 222
223 223
224 224 class LocalProcessLauncher(BaseLauncher):
225 225 """Start and stop an external process in an asynchronous manner.
226 226
227 227 This will launch the external process with a working directory of
228 228 ``self.work_dir``.
229 229 """
230 230
231 231 # This is used to to construct self.args, which is passed to
232 232 # spawnProcess.
233 233 cmd_and_args = List([])
234 234 poll_frequency = Int(100) # in ms
235 235
236 236 def __init__(self, work_dir=u'.', config=None, **kwargs):
237 237 super(LocalProcessLauncher, self).__init__(
238 238 work_dir=work_dir, config=config, **kwargs
239 239 )
240 240 self.process = None
241 241 self.start_deferred = None
242 242 self.poller = None
243 243
244 244 def find_args(self):
245 245 return self.cmd_and_args
246 246
247 247 def start(self):
248 248 if self.state == 'before':
249 249 self.process = Popen(self.args,
250 250 stdout=PIPE,stderr=PIPE,stdin=PIPE,
251 251 env=os.environ,
252 252 cwd=self.work_dir
253 253 )
254 254
255 255 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
256 256 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
257 257 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
258 258 self.poller.start()
259 259 self.notify_start(self.process.pid)
260 260 else:
261 261 s = 'The process was already started and has state: %r' % self.state
262 262 raise ProcessStateError(s)
263 263
264 264 def stop(self):
265 265 return self.interrupt_then_kill()
266 266
267 267 def signal(self, sig):
268 268 if self.state == 'running':
269 269 self.process.send_signal(sig)
270 270
271 271 def interrupt_then_kill(self, delay=2.0):
272 272 """Send INT, wait a delay and then send KILL."""
273 273 self.signal(SIGINT)
274 274 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
275 275 self.killer.start()
276 276
277 277 # callbacks, etc:
278 278
279 279 def handle_stdout(self, fd, events):
280 280 line = self.process.stdout.readline()
281 281 # a stopped process will be readable but return empty strings
282 282 if line:
283 283 self.log.info(line[:-1])
284 284 else:
285 285 self.poll()
286 286
287 287 def handle_stderr(self, fd, events):
288 288 line = self.process.stderr.readline()
289 289 # a stopped process will be readable but return empty strings
290 290 if line:
291 291 self.log.error(line[:-1])
292 292 else:
293 293 self.poll()
294 294
295 295 def poll(self):
296 296 status = self.process.poll()
297 297 if status is not None:
298 298 self.poller.stop()
299 299 self.loop.remove_handler(self.process.stdout.fileno())
300 300 self.loop.remove_handler(self.process.stderr.fileno())
301 301 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
302 302 return status
303 303
304 304 class LocalControllerLauncher(LocalProcessLauncher):
305 305 """Launch a controller as a regular external process."""
306 306
307 307 controller_cmd = List(ipcontroller_cmd_argv, config=True)
308 308 # Command line arguments to ipcontroller.
309 309 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
310 310
311 311 def find_args(self):
312 312 return self.controller_cmd + self.controller_args
313 313
314 314 def start(self, cluster_dir):
315 315 """Start the controller by cluster_dir."""
316 316 self.controller_args.extend(['--cluster-dir', cluster_dir])
317 317 self.cluster_dir = unicode(cluster_dir)
318 318 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
319 319 return super(LocalControllerLauncher, self).start()
320 320
321 321
322 322 class LocalEngineLauncher(LocalProcessLauncher):
323 323 """Launch a single engine as a regular externall process."""
324 324
325 325 engine_cmd = List(ipengine_cmd_argv, config=True)
326 326 # Command line arguments for ipengine.
327 327 engine_args = List(
328 328 ['--log-to-file','--log-level', str(logging.INFO)], config=True
329 329 )
330 330
331 331 def find_args(self):
332 332 return self.engine_cmd + self.engine_args
333 333
334 334 def start(self, cluster_dir):
335 335 """Start the engine by cluster_dir."""
336 336 self.engine_args.extend(['--cluster-dir', cluster_dir])
337 337 self.cluster_dir = unicode(cluster_dir)
338 338 return super(LocalEngineLauncher, self).start()
339 339
340 340
341 341 class LocalEngineSetLauncher(BaseLauncher):
342 342 """Launch a set of engines as regular external processes."""
343 343
344 344 # Command line arguments for ipengine.
345 345 engine_args = List(
346 346 ['--log-to-file','--log-level', str(logging.INFO)], config=True
347 347 )
348 348 # launcher class
349 349 launcher_class = LocalEngineLauncher
350 350
351 351 launchers = Dict()
352 352 stop_data = Dict()
353 353
354 354 def __init__(self, work_dir=u'.', config=None, **kwargs):
355 355 super(LocalEngineSetLauncher, self).__init__(
356 356 work_dir=work_dir, config=config, **kwargs
357 357 )
358 358 self.stop_data = {}
359 359
360 360 def start(self, n, cluster_dir):
361 361 """Start n engines by profile or cluster_dir."""
362 362 self.cluster_dir = unicode(cluster_dir)
363 363 dlist = []
364 364 for i in range(n):
365 365 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
366 366 # Copy the engine args over to each engine launcher.
367 367 el.engine_args = copy.deepcopy(self.engine_args)
368 368 el.on_stop(self._notice_engine_stopped)
369 369 d = el.start(cluster_dir)
370 370 if i==0:
371 371 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
372 372 self.launchers[i] = el
373 373 dlist.append(d)
374 374 self.notify_start(dlist)
375 375 # The consumeErrors here could be dangerous
376 376 # dfinal = gatherBoth(dlist, consumeErrors=True)
377 377 # dfinal.addCallback(self.notify_start)
378 378 return dlist
379 379
380 380 def find_args(self):
381 381 return ['engine set']
382 382
383 383 def signal(self, sig):
384 384 dlist = []
385 385 for el in self.launchers.itervalues():
386 386 d = el.signal(sig)
387 387 dlist.append(d)
388 388 # dfinal = gatherBoth(dlist, consumeErrors=True)
389 389 return dlist
390 390
391 391 def interrupt_then_kill(self, delay=1.0):
392 392 dlist = []
393 393 for el in self.launchers.itervalues():
394 394 d = el.interrupt_then_kill(delay)
395 395 dlist.append(d)
396 396 # dfinal = gatherBoth(dlist, consumeErrors=True)
397 397 return dlist
398 398
399 399 def stop(self):
400 400 return self.interrupt_then_kill()
401 401
402 402 def _notice_engine_stopped(self, data):
403 403 pid = data['pid']
404 404 for idx,el in self.launchers.iteritems():
405 405 if el.process.pid == pid:
406 406 break
407 407 self.launchers.pop(idx)
408 408 self.stop_data[idx] = data
409 409 if not self.launchers:
410 410 self.notify_stop(self.stop_data)
411 411
412 412
413 413 #-----------------------------------------------------------------------------
414 414 # MPIExec launchers
415 415 #-----------------------------------------------------------------------------
416 416
417 417
418 418 class MPIExecLauncher(LocalProcessLauncher):
419 419 """Launch an external process using mpiexec."""
420 420
421 421 # The mpiexec command to use in starting the process.
422 422 mpi_cmd = List(['mpiexec'], config=True)
423 423 # The command line arguments to pass to mpiexec.
424 424 mpi_args = List([], config=True)
425 425 # The program to start using mpiexec.
426 426 program = List(['date'], config=True)
427 427 # The command line argument to the program.
428 428 program_args = List([], config=True)
429 429 # The number of instances of the program to start.
430 430 n = Int(1, config=True)
431 431
432 432 def find_args(self):
433 433 """Build self.args using all the fields."""
434 434 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
435 435 self.program + self.program_args
436 436
437 437 def start(self, n):
438 438 """Start n instances of the program using mpiexec."""
439 439 self.n = n
440 440 return super(MPIExecLauncher, self).start()
441 441
442 442
443 443 class MPIExecControllerLauncher(MPIExecLauncher):
444 444 """Launch a controller using mpiexec."""
445 445
446 446 controller_cmd = List(ipcontroller_cmd_argv, config=True)
447 447 # Command line arguments to ipcontroller.
448 448 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
449 449 n = Int(1, config=False)
450 450
451 451 def start(self, cluster_dir):
452 452 """Start the controller by cluster_dir."""
453 453 self.controller_args.extend(['--cluster-dir', cluster_dir])
454 454 self.cluster_dir = unicode(cluster_dir)
455 455 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
456 456 return super(MPIExecControllerLauncher, self).start(1)
457 457
458 458 def find_args(self):
459 459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
460 460 self.controller_cmd + self.controller_args
461 461
462 462
463 463 class MPIExecEngineSetLauncher(MPIExecLauncher):
464 464
465 465 program = List(ipengine_cmd_argv, config=True)
466 466 # Command line arguments for ipengine.
467 467 program_args = List(
468 468 ['--log-to-file','--log-level', str(logging.INFO)], config=True
469 469 )
470 470 n = Int(1, config=True)
471 471
472 472 def start(self, n, cluster_dir):
473 473 """Start n engines by profile or cluster_dir."""
474 474 self.program_args.extend(['--cluster-dir', cluster_dir])
475 475 self.cluster_dir = unicode(cluster_dir)
476 476 self.n = n
477 477 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
478 478 return super(MPIExecEngineSetLauncher, self).start(n)
479 479
480 480 #-----------------------------------------------------------------------------
481 481 # SSH launchers
482 482 #-----------------------------------------------------------------------------
483 483
484 484 # TODO: Get SSH Launcher working again.
485 485
486 486 class SSHLauncher(LocalProcessLauncher):
487 487 """A minimal launcher for ssh.
488 488
489 489 To be useful this will probably have to be extended to use the ``sshx``
490 490 idea for environment variables. There could be other things this needs
491 491 as well.
492 492 """
493 493
494 494 ssh_cmd = List(['ssh'], config=True)
495 495 ssh_args = List(['-tt'], config=True)
496 496 program = List(['date'], config=True)
497 497 program_args = List([], config=True)
498 498 hostname = CUnicode('', config=True)
499 499 user = CUnicode('', config=True)
500 500 location = CUnicode('')
501 501
502 502 def _hostname_changed(self, name, old, new):
503 503 if self.user:
504 504 self.location = u'%s@%s' % (self.user, new)
505 505 else:
506 506 self.location = new
507 507
508 508 def _user_changed(self, name, old, new):
509 509 self.location = u'%s@%s' % (new, self.hostname)
510 510
511 511 def find_args(self):
512 512 return self.ssh_cmd + self.ssh_args + [self.location] + \
513 513 self.program + self.program_args
514 514
515 515 def start(self, cluster_dir, hostname=None, user=None):
516 516 self.cluster_dir = unicode(cluster_dir)
517 517 if hostname is not None:
518 518 self.hostname = hostname
519 519 if user is not None:
520 520 self.user = user
521 521
522 522 return super(SSHLauncher, self).start()
523 523
524 524 def signal(self, sig):
525 525 if self.state == 'running':
526 526 # send escaped ssh connection-closer
527 527 self.process.stdin.write('~.')
528 528 self.process.stdin.flush()
529 529
530 530
531 531
532 532 class SSHControllerLauncher(SSHLauncher):
533 533
534 534 program = List(ipcontroller_cmd_argv, config=True)
535 535 # Command line arguments to ipcontroller.
536 536 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
537 537
538 538
539 539 class SSHEngineLauncher(SSHLauncher):
540 540 program = List(ipengine_cmd_argv, config=True)
541 541 # Command line arguments for ipengine.
542 542 program_args = List(
543 543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
544 544 )
545 545
546 546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
547 547 launcher_class = SSHEngineLauncher
548 548 engines = Dict(config=True)
549 549
550 550 def start(self, n, cluster_dir):
551 551 """Start engines by profile or cluster_dir.
552 552 `n` is ignored, and the `engines` config property is used instead.
553 553 """
554 554
555 555 self.cluster_dir = unicode(cluster_dir)
556 556 dlist = []
557 557 for host, n in self.engines.iteritems():
558 558 if isinstance(n, (tuple, list)):
559 559 n, args = n
560 560 else:
561 561 args = copy.deepcopy(self.engine_args)
562 562
563 563 if '@' in host:
564 564 user,host = host.split('@',1)
565 565 else:
566 566 user=None
567 567 for i in range(n):
568 568 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
569 569
570 570 # Copy the engine args over to each engine launcher.
571 571 i
572 572 el.program_args = args
573 573 el.on_stop(self._notice_engine_stopped)
574 574 d = el.start(cluster_dir, user=user, hostname=host)
575 575 if i==0:
576 576 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
577 577 self.launchers[host+str(i)] = el
578 578 dlist.append(d)
579 579 self.notify_start(dlist)
580 580 return dlist
581 581
582 582
583 583
584 584 #-----------------------------------------------------------------------------
585 585 # Windows HPC Server 2008 scheduler launchers
586 586 #-----------------------------------------------------------------------------
587 587
588 588
589 589 # This is only used on Windows.
590 590 def find_job_cmd():
591 591 if os.name=='nt':
592 592 try:
593 593 return find_cmd('job')
594 except FindCmdError:
594 except (FindCmdError, ImportError):
595 # ImportError will be raised if win32api is not installed
595 596 return 'job'
596 597 else:
597 598 return 'job'
598 599
599 600
600 601 class WindowsHPCLauncher(BaseLauncher):
601 602
602 603 # A regular expression used to get the job id from the output of the
603 604 # submit_command.
604 605 job_id_regexp = Str(r'\d+', config=True)
605 606 # The filename of the instantiated job script.
606 607 job_file_name = CUnicode(u'ipython_job.xml', config=True)
607 608 # The full path to the instantiated job script. This gets made dynamically
608 609 # by combining the work_dir with the job_file_name.
609 610 job_file = CUnicode(u'')
610 611 # The hostname of the scheduler to submit the job to
611 612 scheduler = CUnicode('', config=True)
612 613 job_cmd = CUnicode(find_job_cmd(), config=True)
613 614
614 615 def __init__(self, work_dir=u'.', config=None, **kwargs):
615 616 super(WindowsHPCLauncher, self).__init__(
616 617 work_dir=work_dir, config=config, **kwargs
617 618 )
618 619
619 620 @property
620 621 def job_file(self):
621 622 return os.path.join(self.work_dir, self.job_file_name)
622 623
623 624 def write_job_file(self, n):
624 625 raise NotImplementedError("Implement write_job_file in a subclass.")
625 626
626 627 def find_args(self):
627 628 return [u'job.exe']
628 629
629 630 def parse_job_id(self, output):
630 631 """Take the output of the submit command and return the job id."""
631 632 m = re.search(self.job_id_regexp, output)
632 633 if m is not None:
633 634 job_id = m.group()
634 635 else:
635 636 raise LauncherError("Job id couldn't be determined: %s" % output)
636 637 self.job_id = job_id
637 638 self.log.info('Job started with job id: %r' % job_id)
638 639 return job_id
639 640
640 641 def start(self, n):
641 642 """Start n copies of the process using the Win HPC job scheduler."""
642 643 self.write_job_file(n)
643 644 args = [
644 645 'submit',
645 646 '/jobfile:%s' % self.job_file,
646 647 '/scheduler:%s' % self.scheduler
647 648 ]
648 649 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
649 650 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
650 651 output = check_output([self.job_cmd]+args,
651 652 env=os.environ,
652 653 cwd=self.work_dir,
653 654 stderr=STDOUT
654 655 )
655 656 job_id = self.parse_job_id(output)
656 657 self.notify_start(job_id)
657 658 return job_id
658 659
659 660 def stop(self):
660 661 args = [
661 662 'cancel',
662 663 self.job_id,
663 664 '/scheduler:%s' % self.scheduler
664 665 ]
665 666 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
666 667 try:
667 668 output = check_output([self.job_cmd]+args,
668 669 env=os.environ,
669 670 cwd=self.work_dir,
670 671 stderr=STDOUT
671 672 )
672 673 except:
673 674 output = 'The job already appears to be stoppped: %r' % self.job_id
674 675 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
675 676 return output
676 677
677 678
678 679 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
679 680
680 681 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
681 682 extra_args = List([], config=False)
682 683
683 684 def write_job_file(self, n):
684 685 job = IPControllerJob(config=self.config)
685 686
686 687 t = IPControllerTask(config=self.config)
687 688 # The tasks work directory is *not* the actual work directory of
688 689 # the controller. It is used as the base path for the stdout/stderr
689 690 # files that the scheduler redirects to.
690 691 t.work_directory = self.cluster_dir
691 692 # Add the --cluster-dir and from self.start().
692 693 t.controller_args.extend(self.extra_args)
693 694 job.add_task(t)
694 695
695 696 self.log.info("Writing job description file: %s" % self.job_file)
696 697 job.write(self.job_file)
697 698
698 699 @property
699 700 def job_file(self):
700 701 return os.path.join(self.cluster_dir, self.job_file_name)
701 702
702 703 def start(self, cluster_dir):
703 704 """Start the controller by cluster_dir."""
704 705 self.extra_args = ['--cluster-dir', cluster_dir]
705 706 self.cluster_dir = unicode(cluster_dir)
706 707 return super(WindowsHPCControllerLauncher, self).start(1)
707 708
708 709
709 710 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
710 711
711 712 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
712 713 extra_args = List([], config=False)
713 714
714 715 def write_job_file(self, n):
715 716 job = IPEngineSetJob(config=self.config)
716 717
717 718 for i in range(n):
718 719 t = IPEngineTask(config=self.config)
719 720 # The tasks work directory is *not* the actual work directory of
720 721 # the engine. It is used as the base path for the stdout/stderr
721 722 # files that the scheduler redirects to.
722 723 t.work_directory = self.cluster_dir
723 724 # Add the --cluster-dir and from self.start().
724 725 t.engine_args.extend(self.extra_args)
725 726 job.add_task(t)
726 727
727 728 self.log.info("Writing job description file: %s" % self.job_file)
728 729 job.write(self.job_file)
729 730
730 731 @property
731 732 def job_file(self):
732 733 return os.path.join(self.cluster_dir, self.job_file_name)
733 734
734 735 def start(self, n, cluster_dir):
735 736 """Start the controller by cluster_dir."""
736 737 self.extra_args = ['--cluster-dir', cluster_dir]
737 738 self.cluster_dir = unicode(cluster_dir)
738 739 return super(WindowsHPCEngineSetLauncher, self).start(n)
739 740
740 741
741 742 #-----------------------------------------------------------------------------
742 743 # Batch (PBS) system launchers
743 744 #-----------------------------------------------------------------------------
744 745
745 746 class BatchSystemLauncher(BaseLauncher):
746 747 """Launch an external process using a batch system.
747 748
748 749 This class is designed to work with UNIX batch systems like PBS, LSF,
749 750 GridEngine, etc. The overall model is that there are different commands
750 751 like qsub, qdel, etc. that handle the starting and stopping of the process.
751 752
752 753 This class also has the notion of a batch script. The ``batch_template``
753 754 attribute can be set to a string that is a template for the batch script.
754 755 This template is instantiated using Itpl. Thus the template can use
755 756 ${n} fot the number of instances. Subclasses can add additional variables
756 757 to the template dict.
757 758 """
758 759
759 760 # Subclasses must fill these in. See PBSEngineSet
760 761 # The name of the command line program used to submit jobs.
761 762 submit_command = List([''], config=True)
762 763 # The name of the command line program used to delete jobs.
763 764 delete_command = List([''], config=True)
764 765 # A regular expression used to get the job id from the output of the
765 766 # submit_command.
766 767 job_id_regexp = CUnicode('', config=True)
767 768 # The string that is the batch script template itself.
768 769 batch_template = CUnicode('', config=True)
769 770 # The file that contains the batch template
770 771 batch_template_file = CUnicode(u'', config=True)
771 772 # The filename of the instantiated batch script.
772 773 batch_file_name = CUnicode(u'batch_script', config=True)
773 774 # The PBS Queue
774 775 queue = CUnicode(u'', config=True)
775 776
776 777 # not configurable, override in subclasses
777 778 # PBS Job Array regex
778 779 job_array_regexp = CUnicode('')
779 780 job_array_template = CUnicode('')
780 781 # PBS Queue regex
781 782 queue_regexp = CUnicode('')
782 783 queue_template = CUnicode('')
783 784 # The default batch template, override in subclasses
784 785 default_template = CUnicode('')
785 786 # The full path to the instantiated batch script.
786 787 batch_file = CUnicode(u'')
787 788 # the format dict used with batch_template:
788 789 context = Dict()
789 790
790 791
791 792 def find_args(self):
792 793 return self.submit_command + [self.batch_file]
793 794
794 795 def __init__(self, work_dir=u'.', config=None, **kwargs):
795 796 super(BatchSystemLauncher, self).__init__(
796 797 work_dir=work_dir, config=config, **kwargs
797 798 )
798 799 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
799 800
800 801 def parse_job_id(self, output):
801 802 """Take the output of the submit command and return the job id."""
802 803 m = re.search(self.job_id_regexp, output)
803 804 if m is not None:
804 805 job_id = m.group()
805 806 else:
806 807 raise LauncherError("Job id couldn't be determined: %s" % output)
807 808 self.job_id = job_id
808 809 self.log.info('Job submitted with job id: %r' % job_id)
809 810 return job_id
810 811
811 812 def write_batch_script(self, n):
812 813 """Instantiate and write the batch script to the work_dir."""
813 814 self.context['n'] = n
814 815 self.context['queue'] = self.queue
815 816 print self.context
816 817 # first priority is batch_template if set
817 818 if self.batch_template_file and not self.batch_template:
818 819 # second priority is batch_template_file
819 820 with open(self.batch_template_file) as f:
820 821 self.batch_template = f.read()
821 822 if not self.batch_template:
822 823 # third (last) priority is default_template
823 824 self.batch_template = self.default_template
824 825
825 826 regex = re.compile(self.job_array_regexp)
826 827 # print regex.search(self.batch_template)
827 828 if not regex.search(self.batch_template):
828 829 self.log.info("adding job array settings to batch script")
829 830 firstline, rest = self.batch_template.split('\n',1)
830 831 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
831 832
832 833 regex = re.compile(self.queue_regexp)
833 834 # print regex.search(self.batch_template)
834 835 if self.queue and not regex.search(self.batch_template):
835 836 self.log.info("adding PBS queue settings to batch script")
836 837 firstline, rest = self.batch_template.split('\n',1)
837 838 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
838 839
839 840 script_as_string = Itpl.itplns(self.batch_template, self.context)
840 841 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
841 842
842 843 with open(self.batch_file, 'w') as f:
843 844 f.write(script_as_string)
844 845 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
845 846
846 847 def start(self, n, cluster_dir):
847 848 """Start n copies of the process using a batch system."""
848 849 # Here we save profile and cluster_dir in the context so they
849 850 # can be used in the batch script template as ${profile} and
850 851 # ${cluster_dir}
851 852 self.context['cluster_dir'] = cluster_dir
852 853 self.cluster_dir = unicode(cluster_dir)
853 854 self.write_batch_script(n)
854 855 output = check_output(self.args, env=os.environ)
855 856
856 857 job_id = self.parse_job_id(output)
857 858 self.notify_start(job_id)
858 859 return job_id
859 860
860 861 def stop(self):
861 862 output = check_output(self.delete_command+[self.job_id], env=os.environ)
862 863 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
863 864 return output
864 865
865 866
866 867 class PBSLauncher(BatchSystemLauncher):
867 868 """A BatchSystemLauncher subclass for PBS."""
868 869
869 870 submit_command = List(['qsub'], config=True)
870 871 delete_command = List(['qdel'], config=True)
871 872 job_id_regexp = CUnicode(r'\d+', config=True)
872 873
873 874 batch_file = CUnicode(u'')
874 875 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
875 876 job_array_template = CUnicode('#PBS -t 1-$n')
876 877 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
877 878 queue_template = CUnicode('#PBS -q $queue')
878 879
879 880
880 881 class PBSControllerLauncher(PBSLauncher):
881 882 """Launch a controller using PBS."""
882 883
883 884 batch_file_name = CUnicode(u'pbs_controller', config=True)
884 885 default_template= CUnicode("""#!/bin/sh
885 886 #PBS -V
886 887 #PBS -N ipcontroller
887 888 %s --log-to-file --cluster-dir $cluster_dir
888 889 """%(' '.join(ipcontroller_cmd_argv)))
889 890
890 891 def start(self, cluster_dir):
891 892 """Start the controller by profile or cluster_dir."""
892 893 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
893 894 return super(PBSControllerLauncher, self).start(1, cluster_dir)
894 895
895 896
896 897 class PBSEngineSetLauncher(PBSLauncher):
897 898 """Launch Engines using PBS"""
898 899 batch_file_name = CUnicode(u'pbs_engines', config=True)
899 900 default_template= CUnicode(u"""#!/bin/sh
900 901 #PBS -V
901 902 #PBS -N ipengine
902 903 %s --cluster-dir $cluster_dir
903 904 """%(' '.join(ipengine_cmd_argv)))
904 905
905 906 def start(self, n, cluster_dir):
906 907 """Start n engines by profile or cluster_dir."""
907 908 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
908 909 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
909 910
910 911 #SGE is very similar to PBS
911 912
912 913 class SGELauncher(PBSLauncher):
913 914 """Sun GridEngine is a PBS clone with slightly different syntax"""
914 915 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
915 916 job_array_template = CUnicode('#$$ -t 1-$n')
916 917 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
917 918 queue_template = CUnicode('#$$ -q $queue')
918 919
919 920 class SGEControllerLauncher(SGELauncher):
920 921 """Launch a controller using SGE."""
921 922
922 923 batch_file_name = CUnicode(u'sge_controller', config=True)
923 924 default_template= CUnicode(u"""#$$ -V
924 925 #$$ -S /bin/sh
925 926 #$$ -N ipcontroller
926 927 %s --log-to-file --cluster-dir $cluster_dir
927 928 """%(' '.join(ipcontroller_cmd_argv)))
928 929
929 930 def start(self, cluster_dir):
930 931 """Start the controller by profile or cluster_dir."""
931 932 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
932 933 return super(PBSControllerLauncher, self).start(1, cluster_dir)
933 934
934 935 class SGEEngineSetLauncher(SGELauncher):
935 936 """Launch Engines with SGE"""
936 937 batch_file_name = CUnicode(u'sge_engines', config=True)
937 938 default_template = CUnicode("""#$$ -V
938 939 #$$ -S /bin/sh
939 940 #$$ -N ipengine
940 941 %s --cluster-dir $cluster_dir
941 942 """%(' '.join(ipengine_cmd_argv)))
942 943
943 944 def start(self, n, cluster_dir):
944 945 """Start n engines by profile or cluster_dir."""
945 946 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
946 947 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
947 948
948 949
949 950 #-----------------------------------------------------------------------------
950 951 # A launcher for ipcluster itself!
951 952 #-----------------------------------------------------------------------------
952 953
953 954
954 955 class IPClusterLauncher(LocalProcessLauncher):
955 956 """Launch the ipcluster program in an external process."""
956 957
957 958 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
958 959 # Command line arguments to pass to ipcluster.
959 960 ipcluster_args = List(
960 961 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
961 962 ipcluster_subcommand = Str('start')
962 963 ipcluster_n = Int(2)
963 964
964 965 def find_args(self):
965 966 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
966 967 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
967 968
968 969 def start(self):
969 970 self.log.info("Starting ipcluster: %r" % self.args)
970 971 return super(IPClusterLauncher, self).start()
971 972
General Comments 0
You need to be logged in to leave comments. Login now