##// END OF EJS Templates
forward subprocess IO over zmq on Windows...
MinRK -
Show More
@@ -0,0 +1,67
1 #!/usr/bin/env python
2 """Utility for forwarding file read events over a zmq socket.
3
4 This is necessary because select on Windows only supports"""
5
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2011 The IPython Development Team
8 #
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
12
13 #-----------------------------------------------------------------------------
14 # Imports
15 #-----------------------------------------------------------------------------
16
17 import uuid
18 import zmq
19
20 from threading import Thread
21
22 #-----------------------------------------------------------------------------
23 # Code
24 #-----------------------------------------------------------------------------
25
26 class ForwarderThread(Thread):
27 def __init__(self, sock, fd):
28 Thread.__init__(self)
29 self.daemon=True
30 self.sock = sock
31 self.fd = fd
32
33 def run(self):
34 """loop through lines in self.fd, and send them over self.sock"""
35 line = self.fd.readline()
36 # allow for files opened in unicode mode
37 if isinstance(line, unicode):
38 send = self.sock.send_unicode
39 else:
40 send = self.sock.send
41 while line:
42 send(line)
43 line = self.fd.readline()
44 # line == '' means EOF
45 self.fd.close()
46 self.sock.close()
47
48 def forward_read_events(fd, context=None):
49 """forward read events from an FD over a socket.
50
51 This method wraps a file in a socket pair, so it can
52 be polled for read events by select (specifically zmq.eventloop.ioloop)
53 """
54 if context is None:
55 context = zmq.Context.instance()
56 push = context.socket(zmq.PUSH)
57 push.setsockopt(zmq.LINGER, -1)
58 pull = context.socket(zmq.PULL)
59 addr='inproc://%s'%uuid.uuid4()
60 push.bind(addr)
61 pull.connect(addr)
62 forwarder = ForwarderThread(push, fd)
63 forwarder.start()
64 return pull
65
66
67 __all__ = ['forward_read_events'] No newline at end of file
@@ -1,972 +1,985
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import os
21 21 import re
22 22 import stat
23 23
24 24 from signal import SIGINT, SIGTERM
25 25 try:
26 26 from signal import SIGKILL
27 27 except ImportError:
28 28 SIGKILL=SIGTERM
29 29
30 30 from subprocess import Popen, PIPE, STDOUT
31 31 try:
32 32 from subprocess import check_output
33 33 except ImportError:
34 34 # pre-2.7, define check_output with Popen
35 35 def check_output(*args, **kwargs):
36 36 kwargs.update(dict(stdout=PIPE))
37 37 p = Popen(*args, **kwargs)
38 38 out,err = p.communicate()
39 39 return out
40 40
41 41 from zmq.eventloop import ioloop
42 42
43 43 from IPython.external import Itpl
44 44 # from IPython.config.configurable import Configurable
45 45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
46 46 from IPython.utils.path import get_ipython_module_path
47 47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
48 48
49 49 from IPython.parallel.factory import LoggingFactory
50 50
51 from .win32support import forward_read_events
52
51 53 # load winhpcjob only on Windows
52 54 try:
53 55 from .winhpcjob import (
54 56 IPControllerTask, IPEngineTask,
55 57 IPControllerJob, IPEngineSetJob
56 58 )
57 59 except ImportError:
58 60 pass
59 61
60
62 WINDOWS = os.name == 'nt'
61 63 #-----------------------------------------------------------------------------
62 64 # Paths to the kernel apps
63 65 #-----------------------------------------------------------------------------
64 66
65 67
66 68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
67 69 'IPython.parallel.apps.ipclusterapp'
68 70 ))
69 71
70 72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
71 73 'IPython.parallel.apps.ipengineapp'
72 74 ))
73 75
74 76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
75 77 'IPython.parallel.apps.ipcontrollerapp'
76 78 ))
77 79
78 80 #-----------------------------------------------------------------------------
79 81 # Base launchers and errors
80 82 #-----------------------------------------------------------------------------
81 83
82 84
83 85 class LauncherError(Exception):
84 86 pass
85 87
86 88
87 89 class ProcessStateError(LauncherError):
88 90 pass
89 91
90 92
91 93 class UnknownStatus(LauncherError):
92 94 pass
93 95
94 96
95 97 class BaseLauncher(LoggingFactory):
96 98 """An asbtraction for starting, stopping and signaling a process."""
97 99
98 100 # In all of the launchers, the work_dir is where child processes will be
99 101 # run. This will usually be the cluster_dir, but may not be. any work_dir
100 102 # passed into the __init__ method will override the config value.
101 103 # This should not be used to set the work_dir for the actual engine
102 104 # and controller. Instead, use their own config files or the
103 105 # controller_args, engine_args attributes of the launchers to add
104 106 # the --work-dir option.
105 107 work_dir = Unicode(u'.')
106 108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
107 109
108 110 start_data = Any()
109 111 stop_data = Any()
110 112
111 113 def _loop_default(self):
112 114 return ioloop.IOLoop.instance()
113 115
114 116 def __init__(self, work_dir=u'.', config=None, **kwargs):
115 117 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
116 118 self.state = 'before' # can be before, running, after
117 119 self.stop_callbacks = []
118 120 self.start_data = None
119 121 self.stop_data = None
120 122
121 123 @property
122 124 def args(self):
123 125 """A list of cmd and args that will be used to start the process.
124 126
125 127 This is what is passed to :func:`spawnProcess` and the first element
126 128 will be the process name.
127 129 """
128 130 return self.find_args()
129 131
130 132 def find_args(self):
131 133 """The ``.args`` property calls this to find the args list.
132 134
133 135 Subcommand should implement this to construct the cmd and args.
134 136 """
135 137 raise NotImplementedError('find_args must be implemented in a subclass')
136 138
137 139 @property
138 140 def arg_str(self):
139 141 """The string form of the program arguments."""
140 142 return ' '.join(self.args)
141 143
142 144 @property
143 145 def running(self):
144 146 """Am I running."""
145 147 if self.state == 'running':
146 148 return True
147 149 else:
148 150 return False
149 151
150 152 def start(self):
151 153 """Start the process.
152 154
153 155 This must return a deferred that fires with information about the
154 156 process starting (like a pid, job id, etc.).
155 157 """
156 158 raise NotImplementedError('start must be implemented in a subclass')
157 159
158 160 def stop(self):
159 161 """Stop the process and notify observers of stopping.
160 162
161 163 This must return a deferred that fires with information about the
162 164 processing stopping, like errors that occur while the process is
163 165 attempting to be shut down. This deferred won't fire when the process
164 166 actually stops. To observe the actual process stopping, see
165 167 :func:`observe_stop`.
166 168 """
167 169 raise NotImplementedError('stop must be implemented in a subclass')
168 170
169 171 def on_stop(self, f):
170 172 """Get a deferred that will fire when the process stops.
171 173
172 174 The deferred will fire with data that contains information about
173 175 the exit status of the process.
174 176 """
175 177 if self.state=='after':
176 178 return f(self.stop_data)
177 179 else:
178 180 self.stop_callbacks.append(f)
179 181
180 182 def notify_start(self, data):
181 183 """Call this to trigger startup actions.
182 184
183 185 This logs the process startup and sets the state to 'running'. It is
184 186 a pass-through so it can be used as a callback.
185 187 """
186 188
187 189 self.log.info('Process %r started: %r' % (self.args[0], data))
188 190 self.start_data = data
189 191 self.state = 'running'
190 192 return data
191 193
192 194 def notify_stop(self, data):
193 195 """Call this to trigger process stop actions.
194 196
195 197 This logs the process stopping and sets the state to 'after'. Call
196 198 this to trigger all the deferreds from :func:`observe_stop`."""
197 199
198 200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
199 201 self.stop_data = data
200 202 self.state = 'after'
201 203 for i in range(len(self.stop_callbacks)):
202 204 d = self.stop_callbacks.pop()
203 205 d(data)
204 206 return data
205 207
206 208 def signal(self, sig):
207 209 """Signal the process.
208 210
209 211 Return a semi-meaningless deferred after signaling the process.
210 212
211 213 Parameters
212 214 ----------
213 215 sig : str or int
214 216 'KILL', 'INT', etc., or any signal number
215 217 """
216 218 raise NotImplementedError('signal must be implemented in a subclass')
217 219
218 220
219 221 #-----------------------------------------------------------------------------
220 222 # Local process launchers
221 223 #-----------------------------------------------------------------------------
222 224
223 225
224 226 class LocalProcessLauncher(BaseLauncher):
225 227 """Start and stop an external process in an asynchronous manner.
226 228
227 229 This will launch the external process with a working directory of
228 230 ``self.work_dir``.
229 231 """
230 232
231 233 # This is used to to construct self.args, which is passed to
232 234 # spawnProcess.
233 235 cmd_and_args = List([])
234 236 poll_frequency = Int(100) # in ms
235 237
236 238 def __init__(self, work_dir=u'.', config=None, **kwargs):
237 239 super(LocalProcessLauncher, self).__init__(
238 240 work_dir=work_dir, config=config, **kwargs
239 241 )
240 242 self.process = None
241 243 self.start_deferred = None
242 244 self.poller = None
243 245
244 246 def find_args(self):
245 247 return self.cmd_and_args
246 248
247 249 def start(self):
248 250 if self.state == 'before':
249 251 self.process = Popen(self.args,
250 252 stdout=PIPE,stderr=PIPE,stdin=PIPE,
251 253 env=os.environ,
252 254 cwd=self.work_dir
253 255 )
254
255 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
256 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
256 if WINDOWS:
257 self.stdout = forward_read_events(self.process.stdout)
258 self.stderr = forward_read_events(self.process.stderr)
259 else:
260 self.stdout = self.process.stdout.fileno()
261 self.stderr = self.process.stderr.fileno()
262 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
263 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
257 264 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
258 265 self.poller.start()
259 266 self.notify_start(self.process.pid)
260 267 else:
261 268 s = 'The process was already started and has state: %r' % self.state
262 269 raise ProcessStateError(s)
263 270
264 271 def stop(self):
265 272 return self.interrupt_then_kill()
266 273
267 274 def signal(self, sig):
268 275 if self.state == 'running':
269 276 self.process.send_signal(sig)
270 277
271 278 def interrupt_then_kill(self, delay=2.0):
272 279 """Send INT, wait a delay and then send KILL."""
273 280 self.signal(SIGINT)
274 281 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
275 282 self.killer.start()
276 283
277 284 # callbacks, etc:
278 285
279 286 def handle_stdout(self, fd, events):
280 line = self.process.stdout.readline()
287 if WINDOWS:
288 line = self.stdout.recv()
289 else:
290 line = self.process.stdout.readline()
281 291 # a stopped process will be readable but return empty strings
282 292 if line:
283 293 self.log.info(line[:-1])
284 294 else:
285 295 self.poll()
286 296
287 297 def handle_stderr(self, fd, events):
288 line = self.process.stderr.readline()
298 if WINDOWS:
299 line = self.stderr.recv()
300 else:
301 line = self.process.stderr.readline()
289 302 # a stopped process will be readable but return empty strings
290 303 if line:
291 304 self.log.error(line[:-1])
292 305 else:
293 306 self.poll()
294 307
295 308 def poll(self):
296 309 status = self.process.poll()
297 310 if status is not None:
298 311 self.poller.stop()
299 self.loop.remove_handler(self.process.stdout.fileno())
300 self.loop.remove_handler(self.process.stderr.fileno())
312 self.loop.remove_handler(self.stdout)
313 self.loop.remove_handler(self.stderr)
301 314 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
302 315 return status
303 316
304 317 class LocalControllerLauncher(LocalProcessLauncher):
305 318 """Launch a controller as a regular external process."""
306 319
307 320 controller_cmd = List(ipcontroller_cmd_argv, config=True)
308 321 # Command line arguments to ipcontroller.
309 322 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
310 323
311 324 def find_args(self):
312 325 return self.controller_cmd + self.controller_args
313 326
314 327 def start(self, cluster_dir):
315 328 """Start the controller by cluster_dir."""
316 329 self.controller_args.extend(['--cluster-dir', cluster_dir])
317 330 self.cluster_dir = unicode(cluster_dir)
318 331 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
319 332 return super(LocalControllerLauncher, self).start()
320 333
321 334
322 335 class LocalEngineLauncher(LocalProcessLauncher):
323 336 """Launch a single engine as a regular externall process."""
324 337
325 338 engine_cmd = List(ipengine_cmd_argv, config=True)
326 339 # Command line arguments for ipengine.
327 340 engine_args = List(
328 341 ['--log-to-file','--log-level', str(logging.INFO)], config=True
329 342 )
330 343
331 344 def find_args(self):
332 345 return self.engine_cmd + self.engine_args
333 346
334 347 def start(self, cluster_dir):
335 348 """Start the engine by cluster_dir."""
336 349 self.engine_args.extend(['--cluster-dir', cluster_dir])
337 350 self.cluster_dir = unicode(cluster_dir)
338 351 return super(LocalEngineLauncher, self).start()
339 352
340 353
341 354 class LocalEngineSetLauncher(BaseLauncher):
342 355 """Launch a set of engines as regular external processes."""
343 356
344 357 # Command line arguments for ipengine.
345 358 engine_args = List(
346 359 ['--log-to-file','--log-level', str(logging.INFO)], config=True
347 360 )
348 361 # launcher class
349 362 launcher_class = LocalEngineLauncher
350 363
351 364 launchers = Dict()
352 365 stop_data = Dict()
353 366
354 367 def __init__(self, work_dir=u'.', config=None, **kwargs):
355 368 super(LocalEngineSetLauncher, self).__init__(
356 369 work_dir=work_dir, config=config, **kwargs
357 370 )
358 371 self.stop_data = {}
359 372
360 373 def start(self, n, cluster_dir):
361 374 """Start n engines by profile or cluster_dir."""
362 375 self.cluster_dir = unicode(cluster_dir)
363 376 dlist = []
364 377 for i in range(n):
365 378 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
366 379 # Copy the engine args over to each engine launcher.
367 380 el.engine_args = copy.deepcopy(self.engine_args)
368 381 el.on_stop(self._notice_engine_stopped)
369 382 d = el.start(cluster_dir)
370 383 if i==0:
371 384 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
372 385 self.launchers[i] = el
373 386 dlist.append(d)
374 387 self.notify_start(dlist)
375 388 # The consumeErrors here could be dangerous
376 389 # dfinal = gatherBoth(dlist, consumeErrors=True)
377 390 # dfinal.addCallback(self.notify_start)
378 391 return dlist
379 392
380 393 def find_args(self):
381 394 return ['engine set']
382 395
383 396 def signal(self, sig):
384 397 dlist = []
385 398 for el in self.launchers.itervalues():
386 399 d = el.signal(sig)
387 400 dlist.append(d)
388 401 # dfinal = gatherBoth(dlist, consumeErrors=True)
389 402 return dlist
390 403
391 404 def interrupt_then_kill(self, delay=1.0):
392 405 dlist = []
393 406 for el in self.launchers.itervalues():
394 407 d = el.interrupt_then_kill(delay)
395 408 dlist.append(d)
396 409 # dfinal = gatherBoth(dlist, consumeErrors=True)
397 410 return dlist
398 411
399 412 def stop(self):
400 413 return self.interrupt_then_kill()
401 414
402 415 def _notice_engine_stopped(self, data):
403 416 pid = data['pid']
404 417 for idx,el in self.launchers.iteritems():
405 418 if el.process.pid == pid:
406 419 break
407 420 self.launchers.pop(idx)
408 421 self.stop_data[idx] = data
409 422 if not self.launchers:
410 423 self.notify_stop(self.stop_data)
411 424
412 425
413 426 #-----------------------------------------------------------------------------
414 427 # MPIExec launchers
415 428 #-----------------------------------------------------------------------------
416 429
417 430
418 431 class MPIExecLauncher(LocalProcessLauncher):
419 432 """Launch an external process using mpiexec."""
420 433
421 434 # The mpiexec command to use in starting the process.
422 435 mpi_cmd = List(['mpiexec'], config=True)
423 436 # The command line arguments to pass to mpiexec.
424 437 mpi_args = List([], config=True)
425 438 # The program to start using mpiexec.
426 439 program = List(['date'], config=True)
427 440 # The command line argument to the program.
428 441 program_args = List([], config=True)
429 442 # The number of instances of the program to start.
430 443 n = Int(1, config=True)
431 444
432 445 def find_args(self):
433 446 """Build self.args using all the fields."""
434 447 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
435 448 self.program + self.program_args
436 449
437 450 def start(self, n):
438 451 """Start n instances of the program using mpiexec."""
439 452 self.n = n
440 453 return super(MPIExecLauncher, self).start()
441 454
442 455
443 456 class MPIExecControllerLauncher(MPIExecLauncher):
444 457 """Launch a controller using mpiexec."""
445 458
446 459 controller_cmd = List(ipcontroller_cmd_argv, config=True)
447 460 # Command line arguments to ipcontroller.
448 461 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
449 462 n = Int(1, config=False)
450 463
451 464 def start(self, cluster_dir):
452 465 """Start the controller by cluster_dir."""
453 466 self.controller_args.extend(['--cluster-dir', cluster_dir])
454 467 self.cluster_dir = unicode(cluster_dir)
455 468 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
456 469 return super(MPIExecControllerLauncher, self).start(1)
457 470
458 471 def find_args(self):
459 472 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
460 473 self.controller_cmd + self.controller_args
461 474
462 475
463 476 class MPIExecEngineSetLauncher(MPIExecLauncher):
464 477
465 478 program = List(ipengine_cmd_argv, config=True)
466 479 # Command line arguments for ipengine.
467 480 program_args = List(
468 481 ['--log-to-file','--log-level', str(logging.INFO)], config=True
469 482 )
470 483 n = Int(1, config=True)
471 484
472 485 def start(self, n, cluster_dir):
473 486 """Start n engines by profile or cluster_dir."""
474 487 self.program_args.extend(['--cluster-dir', cluster_dir])
475 488 self.cluster_dir = unicode(cluster_dir)
476 489 self.n = n
477 490 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
478 491 return super(MPIExecEngineSetLauncher, self).start(n)
479 492
480 493 #-----------------------------------------------------------------------------
481 494 # SSH launchers
482 495 #-----------------------------------------------------------------------------
483 496
484 497 # TODO: Get SSH Launcher working again.
485 498
486 499 class SSHLauncher(LocalProcessLauncher):
487 500 """A minimal launcher for ssh.
488 501
489 502 To be useful this will probably have to be extended to use the ``sshx``
490 503 idea for environment variables. There could be other things this needs
491 504 as well.
492 505 """
493 506
494 507 ssh_cmd = List(['ssh'], config=True)
495 508 ssh_args = List(['-tt'], config=True)
496 509 program = List(['date'], config=True)
497 510 program_args = List([], config=True)
498 511 hostname = CUnicode('', config=True)
499 512 user = CUnicode('', config=True)
500 513 location = CUnicode('')
501 514
502 515 def _hostname_changed(self, name, old, new):
503 516 if self.user:
504 517 self.location = u'%s@%s' % (self.user, new)
505 518 else:
506 519 self.location = new
507 520
508 521 def _user_changed(self, name, old, new):
509 522 self.location = u'%s@%s' % (new, self.hostname)
510 523
511 524 def find_args(self):
512 525 return self.ssh_cmd + self.ssh_args + [self.location] + \
513 526 self.program + self.program_args
514 527
515 528 def start(self, cluster_dir, hostname=None, user=None):
516 529 self.cluster_dir = unicode(cluster_dir)
517 530 if hostname is not None:
518 531 self.hostname = hostname
519 532 if user is not None:
520 533 self.user = user
521 534
522 535 return super(SSHLauncher, self).start()
523 536
524 537 def signal(self, sig):
525 538 if self.state == 'running':
526 539 # send escaped ssh connection-closer
527 540 self.process.stdin.write('~.')
528 541 self.process.stdin.flush()
529 542
530 543
531 544
532 545 class SSHControllerLauncher(SSHLauncher):
533 546
534 547 program = List(ipcontroller_cmd_argv, config=True)
535 548 # Command line arguments to ipcontroller.
536 549 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
537 550
538 551
539 552 class SSHEngineLauncher(SSHLauncher):
540 553 program = List(ipengine_cmd_argv, config=True)
541 554 # Command line arguments for ipengine.
542 555 program_args = List(
543 556 ['--log-to-file','--log-level', str(logging.INFO)], config=True
544 557 )
545 558
546 559 class SSHEngineSetLauncher(LocalEngineSetLauncher):
547 560 launcher_class = SSHEngineLauncher
548 561 engines = Dict(config=True)
549 562
550 563 def start(self, n, cluster_dir):
551 564 """Start engines by profile or cluster_dir.
552 565 `n` is ignored, and the `engines` config property is used instead.
553 566 """
554 567
555 568 self.cluster_dir = unicode(cluster_dir)
556 569 dlist = []
557 570 for host, n in self.engines.iteritems():
558 571 if isinstance(n, (tuple, list)):
559 572 n, args = n
560 573 else:
561 574 args = copy.deepcopy(self.engine_args)
562 575
563 576 if '@' in host:
564 577 user,host = host.split('@',1)
565 578 else:
566 579 user=None
567 580 for i in range(n):
568 581 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
569 582
570 583 # Copy the engine args over to each engine launcher.
571 584 i
572 585 el.program_args = args
573 586 el.on_stop(self._notice_engine_stopped)
574 587 d = el.start(cluster_dir, user=user, hostname=host)
575 588 if i==0:
576 589 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
577 590 self.launchers[host+str(i)] = el
578 591 dlist.append(d)
579 592 self.notify_start(dlist)
580 593 return dlist
581 594
582 595
583 596
584 597 #-----------------------------------------------------------------------------
585 598 # Windows HPC Server 2008 scheduler launchers
586 599 #-----------------------------------------------------------------------------
587 600
588 601
589 602 # This is only used on Windows.
590 603 def find_job_cmd():
591 if os.name=='nt':
604 if WINDOWS:
592 605 try:
593 606 return find_cmd('job')
594 607 except (FindCmdError, ImportError):
595 608 # ImportError will be raised if win32api is not installed
596 609 return 'job'
597 610 else:
598 611 return 'job'
599 612
600 613
601 614 class WindowsHPCLauncher(BaseLauncher):
602 615
603 616 # A regular expression used to get the job id from the output of the
604 617 # submit_command.
605 618 job_id_regexp = Str(r'\d+', config=True)
606 619 # The filename of the instantiated job script.
607 620 job_file_name = CUnicode(u'ipython_job.xml', config=True)
608 621 # The full path to the instantiated job script. This gets made dynamically
609 622 # by combining the work_dir with the job_file_name.
610 623 job_file = CUnicode(u'')
611 624 # The hostname of the scheduler to submit the job to
612 625 scheduler = CUnicode('', config=True)
613 626 job_cmd = CUnicode(find_job_cmd(), config=True)
614 627
615 628 def __init__(self, work_dir=u'.', config=None, **kwargs):
616 629 super(WindowsHPCLauncher, self).__init__(
617 630 work_dir=work_dir, config=config, **kwargs
618 631 )
619 632
620 633 @property
621 634 def job_file(self):
622 635 return os.path.join(self.work_dir, self.job_file_name)
623 636
624 637 def write_job_file(self, n):
625 638 raise NotImplementedError("Implement write_job_file in a subclass.")
626 639
627 640 def find_args(self):
628 641 return [u'job.exe']
629 642
630 643 def parse_job_id(self, output):
631 644 """Take the output of the submit command and return the job id."""
632 645 m = re.search(self.job_id_regexp, output)
633 646 if m is not None:
634 647 job_id = m.group()
635 648 else:
636 649 raise LauncherError("Job id couldn't be determined: %s" % output)
637 650 self.job_id = job_id
638 651 self.log.info('Job started with job id: %r' % job_id)
639 652 return job_id
640 653
641 654 def start(self, n):
642 655 """Start n copies of the process using the Win HPC job scheduler."""
643 656 self.write_job_file(n)
644 657 args = [
645 658 'submit',
646 659 '/jobfile:%s' % self.job_file,
647 660 '/scheduler:%s' % self.scheduler
648 661 ]
649 662 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
650 663 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
651 664 output = check_output([self.job_cmd]+args,
652 665 env=os.environ,
653 666 cwd=self.work_dir,
654 667 stderr=STDOUT
655 668 )
656 669 job_id = self.parse_job_id(output)
657 670 self.notify_start(job_id)
658 671 return job_id
659 672
660 673 def stop(self):
661 674 args = [
662 675 'cancel',
663 676 self.job_id,
664 677 '/scheduler:%s' % self.scheduler
665 678 ]
666 679 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
667 680 try:
668 681 output = check_output([self.job_cmd]+args,
669 682 env=os.environ,
670 683 cwd=self.work_dir,
671 684 stderr=STDOUT
672 685 )
673 686 except:
674 687 output = 'The job already appears to be stoppped: %r' % self.job_id
675 688 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
676 689 return output
677 690
678 691
679 692 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
680 693
681 694 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
682 695 extra_args = List([], config=False)
683 696
684 697 def write_job_file(self, n):
685 698 job = IPControllerJob(config=self.config)
686 699
687 700 t = IPControllerTask(config=self.config)
688 701 # The tasks work directory is *not* the actual work directory of
689 702 # the controller. It is used as the base path for the stdout/stderr
690 703 # files that the scheduler redirects to.
691 704 t.work_directory = self.cluster_dir
692 705 # Add the --cluster-dir and from self.start().
693 706 t.controller_args.extend(self.extra_args)
694 707 job.add_task(t)
695 708
696 709 self.log.info("Writing job description file: %s" % self.job_file)
697 710 job.write(self.job_file)
698 711
699 712 @property
700 713 def job_file(self):
701 714 return os.path.join(self.cluster_dir, self.job_file_name)
702 715
703 716 def start(self, cluster_dir):
704 717 """Start the controller by cluster_dir."""
705 718 self.extra_args = ['--cluster-dir', cluster_dir]
706 719 self.cluster_dir = unicode(cluster_dir)
707 720 return super(WindowsHPCControllerLauncher, self).start(1)
708 721
709 722
710 723 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
711 724
712 725 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
713 726 extra_args = List([], config=False)
714 727
715 728 def write_job_file(self, n):
716 729 job = IPEngineSetJob(config=self.config)
717 730
718 731 for i in range(n):
719 732 t = IPEngineTask(config=self.config)
720 733 # The tasks work directory is *not* the actual work directory of
721 734 # the engine. It is used as the base path for the stdout/stderr
722 735 # files that the scheduler redirects to.
723 736 t.work_directory = self.cluster_dir
724 737 # Add the --cluster-dir and from self.start().
725 738 t.engine_args.extend(self.extra_args)
726 739 job.add_task(t)
727 740
728 741 self.log.info("Writing job description file: %s" % self.job_file)
729 742 job.write(self.job_file)
730 743
731 744 @property
732 745 def job_file(self):
733 746 return os.path.join(self.cluster_dir, self.job_file_name)
734 747
735 748 def start(self, n, cluster_dir):
736 749 """Start the controller by cluster_dir."""
737 750 self.extra_args = ['--cluster-dir', cluster_dir]
738 751 self.cluster_dir = unicode(cluster_dir)
739 752 return super(WindowsHPCEngineSetLauncher, self).start(n)
740 753
741 754
742 755 #-----------------------------------------------------------------------------
743 756 # Batch (PBS) system launchers
744 757 #-----------------------------------------------------------------------------
745 758
746 759 class BatchSystemLauncher(BaseLauncher):
747 760 """Launch an external process using a batch system.
748 761
749 762 This class is designed to work with UNIX batch systems like PBS, LSF,
750 763 GridEngine, etc. The overall model is that there are different commands
751 764 like qsub, qdel, etc. that handle the starting and stopping of the process.
752 765
753 766 This class also has the notion of a batch script. The ``batch_template``
754 767 attribute can be set to a string that is a template for the batch script.
755 768 This template is instantiated using Itpl. Thus the template can use
756 769 ${n} fot the number of instances. Subclasses can add additional variables
757 770 to the template dict.
758 771 """
759 772
760 773 # Subclasses must fill these in. See PBSEngineSet
761 774 # The name of the command line program used to submit jobs.
762 775 submit_command = List([''], config=True)
763 776 # The name of the command line program used to delete jobs.
764 777 delete_command = List([''], config=True)
765 778 # A regular expression used to get the job id from the output of the
766 779 # submit_command.
767 780 job_id_regexp = CUnicode('', config=True)
768 781 # The string that is the batch script template itself.
769 782 batch_template = CUnicode('', config=True)
770 783 # The file that contains the batch template
771 784 batch_template_file = CUnicode(u'', config=True)
772 785 # The filename of the instantiated batch script.
773 786 batch_file_name = CUnicode(u'batch_script', config=True)
774 787 # The PBS Queue
775 788 queue = CUnicode(u'', config=True)
776 789
777 790 # not configurable, override in subclasses
778 791 # PBS Job Array regex
779 792 job_array_regexp = CUnicode('')
780 793 job_array_template = CUnicode('')
781 794 # PBS Queue regex
782 795 queue_regexp = CUnicode('')
783 796 queue_template = CUnicode('')
784 797 # The default batch template, override in subclasses
785 798 default_template = CUnicode('')
786 799 # The full path to the instantiated batch script.
787 800 batch_file = CUnicode(u'')
788 801 # the format dict used with batch_template:
789 802 context = Dict()
790 803
791 804
792 805 def find_args(self):
793 806 return self.submit_command + [self.batch_file]
794 807
795 808 def __init__(self, work_dir=u'.', config=None, **kwargs):
796 809 super(BatchSystemLauncher, self).__init__(
797 810 work_dir=work_dir, config=config, **kwargs
798 811 )
799 812 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
800 813
801 814 def parse_job_id(self, output):
802 815 """Take the output of the submit command and return the job id."""
803 816 m = re.search(self.job_id_regexp, output)
804 817 if m is not None:
805 818 job_id = m.group()
806 819 else:
807 820 raise LauncherError("Job id couldn't be determined: %s" % output)
808 821 self.job_id = job_id
809 822 self.log.info('Job submitted with job id: %r' % job_id)
810 823 return job_id
811 824
812 825 def write_batch_script(self, n):
813 826 """Instantiate and write the batch script to the work_dir."""
814 827 self.context['n'] = n
815 828 self.context['queue'] = self.queue
816 829 print self.context
817 830 # first priority is batch_template if set
818 831 if self.batch_template_file and not self.batch_template:
819 832 # second priority is batch_template_file
820 833 with open(self.batch_template_file) as f:
821 834 self.batch_template = f.read()
822 835 if not self.batch_template:
823 836 # third (last) priority is default_template
824 837 self.batch_template = self.default_template
825 838
826 839 regex = re.compile(self.job_array_regexp)
827 840 # print regex.search(self.batch_template)
828 841 if not regex.search(self.batch_template):
829 842 self.log.info("adding job array settings to batch script")
830 843 firstline, rest = self.batch_template.split('\n',1)
831 844 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
832 845
833 846 regex = re.compile(self.queue_regexp)
834 847 # print regex.search(self.batch_template)
835 848 if self.queue and not regex.search(self.batch_template):
836 849 self.log.info("adding PBS queue settings to batch script")
837 850 firstline, rest = self.batch_template.split('\n',1)
838 851 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
839 852
840 853 script_as_string = Itpl.itplns(self.batch_template, self.context)
841 854 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
842 855
843 856 with open(self.batch_file, 'w') as f:
844 857 f.write(script_as_string)
845 858 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
846 859
847 860 def start(self, n, cluster_dir):
848 861 """Start n copies of the process using a batch system."""
849 862 # Here we save profile and cluster_dir in the context so they
850 863 # can be used in the batch script template as ${profile} and
851 864 # ${cluster_dir}
852 865 self.context['cluster_dir'] = cluster_dir
853 866 self.cluster_dir = unicode(cluster_dir)
854 867 self.write_batch_script(n)
855 868 output = check_output(self.args, env=os.environ)
856 869
857 870 job_id = self.parse_job_id(output)
858 871 self.notify_start(job_id)
859 872 return job_id
860 873
861 874 def stop(self):
862 875 output = check_output(self.delete_command+[self.job_id], env=os.environ)
863 876 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
864 877 return output
865 878
866 879
867 880 class PBSLauncher(BatchSystemLauncher):
868 881 """A BatchSystemLauncher subclass for PBS."""
869 882
870 883 submit_command = List(['qsub'], config=True)
871 884 delete_command = List(['qdel'], config=True)
872 885 job_id_regexp = CUnicode(r'\d+', config=True)
873 886
874 887 batch_file = CUnicode(u'')
875 888 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
876 889 job_array_template = CUnicode('#PBS -t 1-$n')
877 890 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
878 891 queue_template = CUnicode('#PBS -q $queue')
879 892
880 893
881 894 class PBSControllerLauncher(PBSLauncher):
882 895 """Launch a controller using PBS."""
883 896
884 897 batch_file_name = CUnicode(u'pbs_controller', config=True)
885 898 default_template= CUnicode("""#!/bin/sh
886 899 #PBS -V
887 900 #PBS -N ipcontroller
888 901 %s --log-to-file --cluster-dir $cluster_dir
889 902 """%(' '.join(ipcontroller_cmd_argv)))
890 903
891 904 def start(self, cluster_dir):
892 905 """Start the controller by profile or cluster_dir."""
893 906 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
894 907 return super(PBSControllerLauncher, self).start(1, cluster_dir)
895 908
896 909
897 910 class PBSEngineSetLauncher(PBSLauncher):
898 911 """Launch Engines using PBS"""
899 912 batch_file_name = CUnicode(u'pbs_engines', config=True)
900 913 default_template= CUnicode(u"""#!/bin/sh
901 914 #PBS -V
902 915 #PBS -N ipengine
903 916 %s --cluster-dir $cluster_dir
904 917 """%(' '.join(ipengine_cmd_argv)))
905 918
906 919 def start(self, n, cluster_dir):
907 920 """Start n engines by profile or cluster_dir."""
908 921 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
909 922 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
910 923
911 924 #SGE is very similar to PBS
912 925
913 926 class SGELauncher(PBSLauncher):
914 927 """Sun GridEngine is a PBS clone with slightly different syntax"""
915 928 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
916 929 job_array_template = CUnicode('#$$ -t 1-$n')
917 930 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
918 931 queue_template = CUnicode('#$$ -q $queue')
919 932
920 933 class SGEControllerLauncher(SGELauncher):
921 934 """Launch a controller using SGE."""
922 935
923 936 batch_file_name = CUnicode(u'sge_controller', config=True)
924 937 default_template= CUnicode(u"""#$$ -V
925 938 #$$ -S /bin/sh
926 939 #$$ -N ipcontroller
927 940 %s --log-to-file --cluster-dir $cluster_dir
928 941 """%(' '.join(ipcontroller_cmd_argv)))
929 942
930 943 def start(self, cluster_dir):
931 944 """Start the controller by profile or cluster_dir."""
932 945 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
933 946 return super(PBSControllerLauncher, self).start(1, cluster_dir)
934 947
935 948 class SGEEngineSetLauncher(SGELauncher):
936 949 """Launch Engines with SGE"""
937 950 batch_file_name = CUnicode(u'sge_engines', config=True)
938 951 default_template = CUnicode("""#$$ -V
939 952 #$$ -S /bin/sh
940 953 #$$ -N ipengine
941 954 %s --cluster-dir $cluster_dir
942 955 """%(' '.join(ipengine_cmd_argv)))
943 956
944 957 def start(self, n, cluster_dir):
945 958 """Start n engines by profile or cluster_dir."""
946 959 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
947 960 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
948 961
949 962
950 963 #-----------------------------------------------------------------------------
951 964 # A launcher for ipcluster itself!
952 965 #-----------------------------------------------------------------------------
953 966
954 967
955 968 class IPClusterLauncher(LocalProcessLauncher):
956 969 """Launch the ipcluster program in an external process."""
957 970
958 971 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
959 972 # Command line arguments to pass to ipcluster.
960 973 ipcluster_args = List(
961 974 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
962 975 ipcluster_subcommand = Str('start')
963 976 ipcluster_n = Int(2)
964 977
965 978 def find_args(self):
966 979 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
967 980 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
968 981
969 982 def start(self):
970 983 self.log.info("Starting ipcluster: %r" % self.args)
971 984 return super(IPClusterLauncher, self).start()
972 985
General Comments 0
You need to be logged in to leave comments. Login now