##// END OF EJS Templates
improve process cleanup on Windows...
MinRK -
Show More
@@ -1,994 +1,996 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import os
21 21 import re
22 22 import stat
23 23
24 # signal imports, handling various platforms, versions
25
24 26 from signal import SIGINT, SIGTERM
25 27 try:
26 28 from signal import SIGKILL
27 29 except ImportError:
28 # windows
30 # Windows
29 31 SIGKILL=SIGTERM
30 32
33 try:
34 # Windows >= 2.7, 3.2
35 from signal import CTRL_C_EVENT as SIGINT
36 except ImportError:
37 pass
38
31 39 from subprocess import Popen, PIPE, STDOUT
32 40 try:
33 41 from subprocess import check_output
34 42 except ImportError:
35 43 # pre-2.7, define check_output with Popen
36 44 def check_output(*args, **kwargs):
37 45 kwargs.update(dict(stdout=PIPE))
38 46 p = Popen(*args, **kwargs)
39 47 out,err = p.communicate()
40 48 return out
41 49
42 50 from zmq.eventloop import ioloop
43 51
44 52 from IPython.external import Itpl
45 53 # from IPython.config.configurable import Configurable
46 54 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
47 55 from IPython.utils.path import get_ipython_module_path
48 56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
49 57
50 58 from IPython.parallel.factory import LoggingFactory
51 59
52 60 from .win32support import forward_read_events
53 61
54 # load winhpcjob only on Windows
55 try:
56 from .winhpcjob import (
57 IPControllerTask, IPEngineTask,
58 IPControllerJob, IPEngineSetJob
59 )
60 except ImportError:
61 pass
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
62 63
63 64 WINDOWS = os.name == 'nt'
64 65
65 if WINDOWS:
66 try:
67 # >= 2.7, 3.2
68 from signal import CTRL_C_EVENT as SIGINT
69 except ImportError:
70 pass
71
72 66 #-----------------------------------------------------------------------------
73 67 # Paths to the kernel apps
74 68 #-----------------------------------------------------------------------------
75 69
76 70
77 71 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 72 'IPython.parallel.apps.ipclusterapp'
79 73 ))
80 74
81 75 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 76 'IPython.parallel.apps.ipengineapp'
83 77 ))
84 78
85 79 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 80 'IPython.parallel.apps.ipcontrollerapp'
87 81 ))
88 82
89 83 #-----------------------------------------------------------------------------
90 84 # Base launchers and errors
91 85 #-----------------------------------------------------------------------------
92 86
93 87
94 88 class LauncherError(Exception):
95 89 pass
96 90
97 91
98 92 class ProcessStateError(LauncherError):
99 93 pass
100 94
101 95
102 96 class UnknownStatus(LauncherError):
103 97 pass
104 98
105 99
106 100 class BaseLauncher(LoggingFactory):
107 101 """An asbtraction for starting, stopping and signaling a process."""
108 102
109 103 # In all of the launchers, the work_dir is where child processes will be
110 104 # run. This will usually be the cluster_dir, but may not be. any work_dir
111 105 # passed into the __init__ method will override the config value.
112 106 # This should not be used to set the work_dir for the actual engine
113 107 # and controller. Instead, use their own config files or the
114 108 # controller_args, engine_args attributes of the launchers to add
115 109 # the --work-dir option.
116 110 work_dir = Unicode(u'.')
117 111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118 112
119 113 start_data = Any()
120 114 stop_data = Any()
121 115
122 116 def _loop_default(self):
123 117 return ioloop.IOLoop.instance()
124 118
125 119 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 120 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 121 self.state = 'before' # can be before, running, after
128 122 self.stop_callbacks = []
129 123 self.start_data = None
130 124 self.stop_data = None
131 125
132 126 @property
133 127 def args(self):
134 128 """A list of cmd and args that will be used to start the process.
135 129
136 130 This is what is passed to :func:`spawnProcess` and the first element
137 131 will be the process name.
138 132 """
139 133 return self.find_args()
140 134
141 135 def find_args(self):
142 136 """The ``.args`` property calls this to find the args list.
143 137
144 138 Subcommand should implement this to construct the cmd and args.
145 139 """
146 140 raise NotImplementedError('find_args must be implemented in a subclass')
147 141
148 142 @property
149 143 def arg_str(self):
150 144 """The string form of the program arguments."""
151 145 return ' '.join(self.args)
152 146
153 147 @property
154 148 def running(self):
155 149 """Am I running."""
156 150 if self.state == 'running':
157 151 return True
158 152 else:
159 153 return False
160 154
161 155 def start(self):
162 156 """Start the process.
163 157
164 158 This must return a deferred that fires with information about the
165 159 process starting (like a pid, job id, etc.).
166 160 """
167 161 raise NotImplementedError('start must be implemented in a subclass')
168 162
169 163 def stop(self):
170 164 """Stop the process and notify observers of stopping.
171 165
172 166 This must return a deferred that fires with information about the
173 167 processing stopping, like errors that occur while the process is
174 168 attempting to be shut down. This deferred won't fire when the process
175 169 actually stops. To observe the actual process stopping, see
176 170 :func:`observe_stop`.
177 171 """
178 172 raise NotImplementedError('stop must be implemented in a subclass')
179 173
180 174 def on_stop(self, f):
181 175 """Get a deferred that will fire when the process stops.
182 176
183 177 The deferred will fire with data that contains information about
184 178 the exit status of the process.
185 179 """
186 180 if self.state=='after':
187 181 return f(self.stop_data)
188 182 else:
189 183 self.stop_callbacks.append(f)
190 184
191 185 def notify_start(self, data):
192 186 """Call this to trigger startup actions.
193 187
194 188 This logs the process startup and sets the state to 'running'. It is
195 189 a pass-through so it can be used as a callback.
196 190 """
197 191
198 192 self.log.info('Process %r started: %r' % (self.args[0], data))
199 193 self.start_data = data
200 194 self.state = 'running'
201 195 return data
202 196
203 197 def notify_stop(self, data):
204 198 """Call this to trigger process stop actions.
205 199
206 200 This logs the process stopping and sets the state to 'after'. Call
207 201 this to trigger all the deferreds from :func:`observe_stop`."""
208 202
209 203 self.log.info('Process %r stopped: %r' % (self.args[0], data))
210 204 self.stop_data = data
211 205 self.state = 'after'
212 206 for i in range(len(self.stop_callbacks)):
213 207 d = self.stop_callbacks.pop()
214 208 d(data)
215 209 return data
216 210
217 211 def signal(self, sig):
218 212 """Signal the process.
219 213
220 214 Return a semi-meaningless deferred after signaling the process.
221 215
222 216 Parameters
223 217 ----------
224 218 sig : str or int
225 219 'KILL', 'INT', etc., or any signal number
226 220 """
227 221 raise NotImplementedError('signal must be implemented in a subclass')
228 222
229 223
230 224 #-----------------------------------------------------------------------------
231 225 # Local process launchers
232 226 #-----------------------------------------------------------------------------
233 227
234 228
235 229 class LocalProcessLauncher(BaseLauncher):
236 230 """Start and stop an external process in an asynchronous manner.
237 231
238 232 This will launch the external process with a working directory of
239 233 ``self.work_dir``.
240 234 """
241 235
242 236 # This is used to to construct self.args, which is passed to
243 237 # spawnProcess.
244 238 cmd_and_args = List([])
245 239 poll_frequency = Int(100) # in ms
246 240
247 241 def __init__(self, work_dir=u'.', config=None, **kwargs):
248 242 super(LocalProcessLauncher, self).__init__(
249 243 work_dir=work_dir, config=config, **kwargs
250 244 )
251 245 self.process = None
252 246 self.start_deferred = None
253 247 self.poller = None
254 248
255 249 def find_args(self):
256 250 return self.cmd_and_args
257 251
258 252 def start(self):
259 253 if self.state == 'before':
260 254 self.process = Popen(self.args,
261 255 stdout=PIPE,stderr=PIPE,stdin=PIPE,
262 256 env=os.environ,
263 257 cwd=self.work_dir
264 258 )
265 259 if WINDOWS:
266 260 self.stdout = forward_read_events(self.process.stdout)
267 261 self.stderr = forward_read_events(self.process.stderr)
268 262 else:
269 263 self.stdout = self.process.stdout.fileno()
270 264 self.stderr = self.process.stderr.fileno()
271 265 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
272 266 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
273 267 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
274 268 self.poller.start()
275 269 self.notify_start(self.process.pid)
276 270 else:
277 271 s = 'The process was already started and has state: %r' % self.state
278 272 raise ProcessStateError(s)
279 273
280 274 def stop(self):
281 275 return self.interrupt_then_kill()
282 276
283 277 def signal(self, sig):
284 278 if self.state == 'running':
279 if WINDOWS and sig != SIGINT:
280 # use Windows tree-kill for better child cleanup
281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
282 else:
285 283 self.process.send_signal(sig)
286 284
287 285 def interrupt_then_kill(self, delay=2.0):
288 286 """Send INT, wait a delay and then send KILL."""
287 try:
289 288 self.signal(SIGINT)
289 except Exception:
290 self.log.debug("interrupt failed")
291 pass
290 292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
291 293 self.killer.start()
292 294
293 295 # callbacks, etc:
294 296
295 297 def handle_stdout(self, fd, events):
296 298 if WINDOWS:
297 299 line = self.stdout.recv()
298 300 else:
299 301 line = self.process.stdout.readline()
300 302 # a stopped process will be readable but return empty strings
301 303 if line:
302 304 self.log.info(line[:-1])
303 305 else:
304 306 self.poll()
305 307
306 308 def handle_stderr(self, fd, events):
307 309 if WINDOWS:
308 310 line = self.stderr.recv()
309 311 else:
310 312 line = self.process.stderr.readline()
311 313 # a stopped process will be readable but return empty strings
312 314 if line:
313 315 self.log.error(line[:-1])
314 316 else:
315 317 self.poll()
316 318
317 319 def poll(self):
318 320 status = self.process.poll()
319 321 if status is not None:
320 322 self.poller.stop()
321 323 self.loop.remove_handler(self.stdout)
322 324 self.loop.remove_handler(self.stderr)
323 325 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
324 326 return status
325 327
326 328 class LocalControllerLauncher(LocalProcessLauncher):
327 329 """Launch a controller as a regular external process."""
328 330
329 331 controller_cmd = List(ipcontroller_cmd_argv, config=True)
330 332 # Command line arguments to ipcontroller.
331 333 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
332 334
333 335 def find_args(self):
334 336 return self.controller_cmd + self.controller_args
335 337
336 338 def start(self, cluster_dir):
337 339 """Start the controller by cluster_dir."""
338 340 self.controller_args.extend(['--cluster-dir', cluster_dir])
339 341 self.cluster_dir = unicode(cluster_dir)
340 342 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
341 343 return super(LocalControllerLauncher, self).start()
342 344
343 345
344 346 class LocalEngineLauncher(LocalProcessLauncher):
345 347 """Launch a single engine as a regular externall process."""
346 348
347 349 engine_cmd = List(ipengine_cmd_argv, config=True)
348 350 # Command line arguments for ipengine.
349 351 engine_args = List(
350 352 ['--log-to-file','--log-level', str(logging.INFO)], config=True
351 353 )
352 354
353 355 def find_args(self):
354 356 return self.engine_cmd + self.engine_args
355 357
356 358 def start(self, cluster_dir):
357 359 """Start the engine by cluster_dir."""
358 360 self.engine_args.extend(['--cluster-dir', cluster_dir])
359 361 self.cluster_dir = unicode(cluster_dir)
360 362 return super(LocalEngineLauncher, self).start()
361 363
362 364
363 365 class LocalEngineSetLauncher(BaseLauncher):
364 366 """Launch a set of engines as regular external processes."""
365 367
366 368 # Command line arguments for ipengine.
367 369 engine_args = List(
368 370 ['--log-to-file','--log-level', str(logging.INFO)], config=True
369 371 )
370 372 # launcher class
371 373 launcher_class = LocalEngineLauncher
372 374
373 375 launchers = Dict()
374 376 stop_data = Dict()
375 377
376 378 def __init__(self, work_dir=u'.', config=None, **kwargs):
377 379 super(LocalEngineSetLauncher, self).__init__(
378 380 work_dir=work_dir, config=config, **kwargs
379 381 )
380 382 self.stop_data = {}
381 383
382 384 def start(self, n, cluster_dir):
383 385 """Start n engines by profile or cluster_dir."""
384 386 self.cluster_dir = unicode(cluster_dir)
385 387 dlist = []
386 388 for i in range(n):
387 389 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
388 390 # Copy the engine args over to each engine launcher.
389 391 el.engine_args = copy.deepcopy(self.engine_args)
390 392 el.on_stop(self._notice_engine_stopped)
391 393 d = el.start(cluster_dir)
392 394 if i==0:
393 395 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
394 396 self.launchers[i] = el
395 397 dlist.append(d)
396 398 self.notify_start(dlist)
397 399 # The consumeErrors here could be dangerous
398 400 # dfinal = gatherBoth(dlist, consumeErrors=True)
399 401 # dfinal.addCallback(self.notify_start)
400 402 return dlist
401 403
402 404 def find_args(self):
403 405 return ['engine set']
404 406
405 407 def signal(self, sig):
406 408 dlist = []
407 409 for el in self.launchers.itervalues():
408 410 d = el.signal(sig)
409 411 dlist.append(d)
410 412 # dfinal = gatherBoth(dlist, consumeErrors=True)
411 413 return dlist
412 414
413 415 def interrupt_then_kill(self, delay=1.0):
414 416 dlist = []
415 417 for el in self.launchers.itervalues():
416 418 d = el.interrupt_then_kill(delay)
417 419 dlist.append(d)
418 420 # dfinal = gatherBoth(dlist, consumeErrors=True)
419 421 return dlist
420 422
421 423 def stop(self):
422 424 return self.interrupt_then_kill()
423 425
424 426 def _notice_engine_stopped(self, data):
425 427 pid = data['pid']
426 428 for idx,el in self.launchers.iteritems():
427 429 if el.process.pid == pid:
428 430 break
429 431 self.launchers.pop(idx)
430 432 self.stop_data[idx] = data
431 433 if not self.launchers:
432 434 self.notify_stop(self.stop_data)
433 435
434 436
435 437 #-----------------------------------------------------------------------------
436 438 # MPIExec launchers
437 439 #-----------------------------------------------------------------------------
438 440
439 441
440 442 class MPIExecLauncher(LocalProcessLauncher):
441 443 """Launch an external process using mpiexec."""
442 444
443 445 # The mpiexec command to use in starting the process.
444 446 mpi_cmd = List(['mpiexec'], config=True)
445 447 # The command line arguments to pass to mpiexec.
446 448 mpi_args = List([], config=True)
447 449 # The program to start using mpiexec.
448 450 program = List(['date'], config=True)
449 451 # The command line argument to the program.
450 452 program_args = List([], config=True)
451 453 # The number of instances of the program to start.
452 454 n = Int(1, config=True)
453 455
454 456 def find_args(self):
455 457 """Build self.args using all the fields."""
456 458 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
457 459 self.program + self.program_args
458 460
459 461 def start(self, n):
460 462 """Start n instances of the program using mpiexec."""
461 463 self.n = n
462 464 return super(MPIExecLauncher, self).start()
463 465
464 466
465 467 class MPIExecControllerLauncher(MPIExecLauncher):
466 468 """Launch a controller using mpiexec."""
467 469
468 470 controller_cmd = List(ipcontroller_cmd_argv, config=True)
469 471 # Command line arguments to ipcontroller.
470 472 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
471 473 n = Int(1, config=False)
472 474
473 475 def start(self, cluster_dir):
474 476 """Start the controller by cluster_dir."""
475 477 self.controller_args.extend(['--cluster-dir', cluster_dir])
476 478 self.cluster_dir = unicode(cluster_dir)
477 479 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
478 480 return super(MPIExecControllerLauncher, self).start(1)
479 481
480 482 def find_args(self):
481 483 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
482 484 self.controller_cmd + self.controller_args
483 485
484 486
485 487 class MPIExecEngineSetLauncher(MPIExecLauncher):
486 488
487 489 program = List(ipengine_cmd_argv, config=True)
488 490 # Command line arguments for ipengine.
489 491 program_args = List(
490 492 ['--log-to-file','--log-level', str(logging.INFO)], config=True
491 493 )
492 494 n = Int(1, config=True)
493 495
494 496 def start(self, n, cluster_dir):
495 497 """Start n engines by profile or cluster_dir."""
496 498 self.program_args.extend(['--cluster-dir', cluster_dir])
497 499 self.cluster_dir = unicode(cluster_dir)
498 500 self.n = n
499 501 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
500 502 return super(MPIExecEngineSetLauncher, self).start(n)
501 503
502 504 #-----------------------------------------------------------------------------
503 505 # SSH launchers
504 506 #-----------------------------------------------------------------------------
505 507
506 508 # TODO: Get SSH Launcher working again.
507 509
508 510 class SSHLauncher(LocalProcessLauncher):
509 511 """A minimal launcher for ssh.
510 512
511 513 To be useful this will probably have to be extended to use the ``sshx``
512 514 idea for environment variables. There could be other things this needs
513 515 as well.
514 516 """
515 517
516 518 ssh_cmd = List(['ssh'], config=True)
517 519 ssh_args = List(['-tt'], config=True)
518 520 program = List(['date'], config=True)
519 521 program_args = List([], config=True)
520 522 hostname = CUnicode('', config=True)
521 523 user = CUnicode('', config=True)
522 524 location = CUnicode('')
523 525
524 526 def _hostname_changed(self, name, old, new):
525 527 if self.user:
526 528 self.location = u'%s@%s' % (self.user, new)
527 529 else:
528 530 self.location = new
529 531
530 532 def _user_changed(self, name, old, new):
531 533 self.location = u'%s@%s' % (new, self.hostname)
532 534
533 535 def find_args(self):
534 536 return self.ssh_cmd + self.ssh_args + [self.location] + \
535 537 self.program + self.program_args
536 538
537 539 def start(self, cluster_dir, hostname=None, user=None):
538 540 self.cluster_dir = unicode(cluster_dir)
539 541 if hostname is not None:
540 542 self.hostname = hostname
541 543 if user is not None:
542 544 self.user = user
543 545
544 546 return super(SSHLauncher, self).start()
545 547
546 548 def signal(self, sig):
547 549 if self.state == 'running':
548 550 # send escaped ssh connection-closer
549 551 self.process.stdin.write('~.')
550 552 self.process.stdin.flush()
551 553
552 554
553 555
554 556 class SSHControllerLauncher(SSHLauncher):
555 557
556 558 program = List(ipcontroller_cmd_argv, config=True)
557 559 # Command line arguments to ipcontroller.
558 560 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
559 561
560 562
561 563 class SSHEngineLauncher(SSHLauncher):
562 564 program = List(ipengine_cmd_argv, config=True)
563 565 # Command line arguments for ipengine.
564 566 program_args = List(
565 567 ['--log-to-file','--log-level', str(logging.INFO)], config=True
566 568 )
567 569
568 570 class SSHEngineSetLauncher(LocalEngineSetLauncher):
569 571 launcher_class = SSHEngineLauncher
570 572 engines = Dict(config=True)
571 573
572 574 def start(self, n, cluster_dir):
573 575 """Start engines by profile or cluster_dir.
574 576 `n` is ignored, and the `engines` config property is used instead.
575 577 """
576 578
577 579 self.cluster_dir = unicode(cluster_dir)
578 580 dlist = []
579 581 for host, n in self.engines.iteritems():
580 582 if isinstance(n, (tuple, list)):
581 583 n, args = n
582 584 else:
583 585 args = copy.deepcopy(self.engine_args)
584 586
585 587 if '@' in host:
586 588 user,host = host.split('@',1)
587 589 else:
588 590 user=None
589 591 for i in range(n):
590 592 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
591 593
592 594 # Copy the engine args over to each engine launcher.
593 595 i
594 596 el.program_args = args
595 597 el.on_stop(self._notice_engine_stopped)
596 598 d = el.start(cluster_dir, user=user, hostname=host)
597 599 if i==0:
598 600 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
599 601 self.launchers[host+str(i)] = el
600 602 dlist.append(d)
601 603 self.notify_start(dlist)
602 604 return dlist
603 605
604 606
605 607
606 608 #-----------------------------------------------------------------------------
607 609 # Windows HPC Server 2008 scheduler launchers
608 610 #-----------------------------------------------------------------------------
609 611
610 612
611 613 # This is only used on Windows.
612 614 def find_job_cmd():
613 615 if WINDOWS:
614 616 try:
615 617 return find_cmd('job')
616 618 except (FindCmdError, ImportError):
617 619 # ImportError will be raised if win32api is not installed
618 620 return 'job'
619 621 else:
620 622 return 'job'
621 623
622 624
623 625 class WindowsHPCLauncher(BaseLauncher):
624 626
625 627 # A regular expression used to get the job id from the output of the
626 628 # submit_command.
627 629 job_id_regexp = Str(r'\d+', config=True)
628 630 # The filename of the instantiated job script.
629 631 job_file_name = CUnicode(u'ipython_job.xml', config=True)
630 632 # The full path to the instantiated job script. This gets made dynamically
631 633 # by combining the work_dir with the job_file_name.
632 634 job_file = CUnicode(u'')
633 635 # The hostname of the scheduler to submit the job to
634 636 scheduler = CUnicode('', config=True)
635 637 job_cmd = CUnicode(find_job_cmd(), config=True)
636 638
637 639 def __init__(self, work_dir=u'.', config=None, **kwargs):
638 640 super(WindowsHPCLauncher, self).__init__(
639 641 work_dir=work_dir, config=config, **kwargs
640 642 )
641 643
642 644 @property
643 645 def job_file(self):
644 646 return os.path.join(self.work_dir, self.job_file_name)
645 647
646 648 def write_job_file(self, n):
647 649 raise NotImplementedError("Implement write_job_file in a subclass.")
648 650
649 651 def find_args(self):
650 652 return [u'job.exe']
651 653
652 654 def parse_job_id(self, output):
653 655 """Take the output of the submit command and return the job id."""
654 656 m = re.search(self.job_id_regexp, output)
655 657 if m is not None:
656 658 job_id = m.group()
657 659 else:
658 660 raise LauncherError("Job id couldn't be determined: %s" % output)
659 661 self.job_id = job_id
660 662 self.log.info('Job started with job id: %r' % job_id)
661 663 return job_id
662 664
663 665 def start(self, n):
664 666 """Start n copies of the process using the Win HPC job scheduler."""
665 667 self.write_job_file(n)
666 668 args = [
667 669 'submit',
668 670 '/jobfile:%s' % self.job_file,
669 671 '/scheduler:%s' % self.scheduler
670 672 ]
671 673 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
672 674 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
673 675 output = check_output([self.job_cmd]+args,
674 676 env=os.environ,
675 677 cwd=self.work_dir,
676 678 stderr=STDOUT
677 679 )
678 680 job_id = self.parse_job_id(output)
679 681 self.notify_start(job_id)
680 682 return job_id
681 683
682 684 def stop(self):
683 685 args = [
684 686 'cancel',
685 687 self.job_id,
686 688 '/scheduler:%s' % self.scheduler
687 689 ]
688 690 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
689 691 try:
690 692 output = check_output([self.job_cmd]+args,
691 693 env=os.environ,
692 694 cwd=self.work_dir,
693 695 stderr=STDOUT
694 696 )
695 697 except:
696 698 output = 'The job already appears to be stoppped: %r' % self.job_id
697 699 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
698 700 return output
699 701
700 702
701 703 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
702 704
703 705 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
704 706 extra_args = List([], config=False)
705 707
706 708 def write_job_file(self, n):
707 709 job = IPControllerJob(config=self.config)
708 710
709 711 t = IPControllerTask(config=self.config)
710 712 # The tasks work directory is *not* the actual work directory of
711 713 # the controller. It is used as the base path for the stdout/stderr
712 714 # files that the scheduler redirects to.
713 715 t.work_directory = self.cluster_dir
714 716 # Add the --cluster-dir and from self.start().
715 717 t.controller_args.extend(self.extra_args)
716 718 job.add_task(t)
717 719
718 720 self.log.info("Writing job description file: %s" % self.job_file)
719 721 job.write(self.job_file)
720 722
721 723 @property
722 724 def job_file(self):
723 725 return os.path.join(self.cluster_dir, self.job_file_name)
724 726
725 727 def start(self, cluster_dir):
726 728 """Start the controller by cluster_dir."""
727 729 self.extra_args = ['--cluster-dir', cluster_dir]
728 730 self.cluster_dir = unicode(cluster_dir)
729 731 return super(WindowsHPCControllerLauncher, self).start(1)
730 732
731 733
732 734 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
733 735
734 736 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
735 737 extra_args = List([], config=False)
736 738
737 739 def write_job_file(self, n):
738 740 job = IPEngineSetJob(config=self.config)
739 741
740 742 for i in range(n):
741 743 t = IPEngineTask(config=self.config)
742 744 # The tasks work directory is *not* the actual work directory of
743 745 # the engine. It is used as the base path for the stdout/stderr
744 746 # files that the scheduler redirects to.
745 747 t.work_directory = self.cluster_dir
746 748 # Add the --cluster-dir and from self.start().
747 749 t.engine_args.extend(self.extra_args)
748 750 job.add_task(t)
749 751
750 752 self.log.info("Writing job description file: %s" % self.job_file)
751 753 job.write(self.job_file)
752 754
753 755 @property
754 756 def job_file(self):
755 757 return os.path.join(self.cluster_dir, self.job_file_name)
756 758
757 759 def start(self, n, cluster_dir):
758 760 """Start the controller by cluster_dir."""
759 761 self.extra_args = ['--cluster-dir', cluster_dir]
760 762 self.cluster_dir = unicode(cluster_dir)
761 763 return super(WindowsHPCEngineSetLauncher, self).start(n)
762 764
763 765
764 766 #-----------------------------------------------------------------------------
765 767 # Batch (PBS) system launchers
766 768 #-----------------------------------------------------------------------------
767 769
768 770 class BatchSystemLauncher(BaseLauncher):
769 771 """Launch an external process using a batch system.
770 772
771 773 This class is designed to work with UNIX batch systems like PBS, LSF,
772 774 GridEngine, etc. The overall model is that there are different commands
773 775 like qsub, qdel, etc. that handle the starting and stopping of the process.
774 776
775 777 This class also has the notion of a batch script. The ``batch_template``
776 778 attribute can be set to a string that is a template for the batch script.
777 779 This template is instantiated using Itpl. Thus the template can use
778 780 ${n} fot the number of instances. Subclasses can add additional variables
779 781 to the template dict.
780 782 """
781 783
782 784 # Subclasses must fill these in. See PBSEngineSet
783 785 # The name of the command line program used to submit jobs.
784 786 submit_command = List([''], config=True)
785 787 # The name of the command line program used to delete jobs.
786 788 delete_command = List([''], config=True)
787 789 # A regular expression used to get the job id from the output of the
788 790 # submit_command.
789 791 job_id_regexp = CUnicode('', config=True)
790 792 # The string that is the batch script template itself.
791 793 batch_template = CUnicode('', config=True)
792 794 # The file that contains the batch template
793 795 batch_template_file = CUnicode(u'', config=True)
794 796 # The filename of the instantiated batch script.
795 797 batch_file_name = CUnicode(u'batch_script', config=True)
796 798 # The PBS Queue
797 799 queue = CUnicode(u'', config=True)
798 800
799 801 # not configurable, override in subclasses
800 802 # PBS Job Array regex
801 803 job_array_regexp = CUnicode('')
802 804 job_array_template = CUnicode('')
803 805 # PBS Queue regex
804 806 queue_regexp = CUnicode('')
805 807 queue_template = CUnicode('')
806 808 # The default batch template, override in subclasses
807 809 default_template = CUnicode('')
808 810 # The full path to the instantiated batch script.
809 811 batch_file = CUnicode(u'')
810 812 # the format dict used with batch_template:
811 813 context = Dict()
812 814
813 815
814 816 def find_args(self):
815 817 return self.submit_command + [self.batch_file]
816 818
817 819 def __init__(self, work_dir=u'.', config=None, **kwargs):
818 820 super(BatchSystemLauncher, self).__init__(
819 821 work_dir=work_dir, config=config, **kwargs
820 822 )
821 823 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
822 824
823 825 def parse_job_id(self, output):
824 826 """Take the output of the submit command and return the job id."""
825 827 m = re.search(self.job_id_regexp, output)
826 828 if m is not None:
827 829 job_id = m.group()
828 830 else:
829 831 raise LauncherError("Job id couldn't be determined: %s" % output)
830 832 self.job_id = job_id
831 833 self.log.info('Job submitted with job id: %r' % job_id)
832 834 return job_id
833 835
834 836 def write_batch_script(self, n):
835 837 """Instantiate and write the batch script to the work_dir."""
836 838 self.context['n'] = n
837 839 self.context['queue'] = self.queue
838 840 print self.context
839 841 # first priority is batch_template if set
840 842 if self.batch_template_file and not self.batch_template:
841 843 # second priority is batch_template_file
842 844 with open(self.batch_template_file) as f:
843 845 self.batch_template = f.read()
844 846 if not self.batch_template:
845 847 # third (last) priority is default_template
846 848 self.batch_template = self.default_template
847 849
848 850 regex = re.compile(self.job_array_regexp)
849 851 # print regex.search(self.batch_template)
850 852 if not regex.search(self.batch_template):
851 853 self.log.info("adding job array settings to batch script")
852 854 firstline, rest = self.batch_template.split('\n',1)
853 855 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
854 856
855 857 regex = re.compile(self.queue_regexp)
856 858 # print regex.search(self.batch_template)
857 859 if self.queue and not regex.search(self.batch_template):
858 860 self.log.info("adding PBS queue settings to batch script")
859 861 firstline, rest = self.batch_template.split('\n',1)
860 862 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
861 863
862 864 script_as_string = Itpl.itplns(self.batch_template, self.context)
863 865 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
864 866
865 867 with open(self.batch_file, 'w') as f:
866 868 f.write(script_as_string)
867 869 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
868 870
869 871 def start(self, n, cluster_dir):
870 872 """Start n copies of the process using a batch system."""
871 873 # Here we save profile and cluster_dir in the context so they
872 874 # can be used in the batch script template as ${profile} and
873 875 # ${cluster_dir}
874 876 self.context['cluster_dir'] = cluster_dir
875 877 self.cluster_dir = unicode(cluster_dir)
876 878 self.write_batch_script(n)
877 879 output = check_output(self.args, env=os.environ)
878 880
879 881 job_id = self.parse_job_id(output)
880 882 self.notify_start(job_id)
881 883 return job_id
882 884
883 885 def stop(self):
884 886 output = check_output(self.delete_command+[self.job_id], env=os.environ)
885 887 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
886 888 return output
887 889
888 890
889 891 class PBSLauncher(BatchSystemLauncher):
890 892 """A BatchSystemLauncher subclass for PBS."""
891 893
892 894 submit_command = List(['qsub'], config=True)
893 895 delete_command = List(['qdel'], config=True)
894 896 job_id_regexp = CUnicode(r'\d+', config=True)
895 897
896 898 batch_file = CUnicode(u'')
897 899 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
898 900 job_array_template = CUnicode('#PBS -t 1-$n')
899 901 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
900 902 queue_template = CUnicode('#PBS -q $queue')
901 903
902 904
903 905 class PBSControllerLauncher(PBSLauncher):
904 906 """Launch a controller using PBS."""
905 907
906 908 batch_file_name = CUnicode(u'pbs_controller', config=True)
907 909 default_template= CUnicode("""#!/bin/sh
908 910 #PBS -V
909 911 #PBS -N ipcontroller
910 912 %s --log-to-file --cluster-dir $cluster_dir
911 913 """%(' '.join(ipcontroller_cmd_argv)))
912 914
913 915 def start(self, cluster_dir):
914 916 """Start the controller by profile or cluster_dir."""
915 917 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
916 918 return super(PBSControllerLauncher, self).start(1, cluster_dir)
917 919
918 920
919 921 class PBSEngineSetLauncher(PBSLauncher):
920 922 """Launch Engines using PBS"""
921 923 batch_file_name = CUnicode(u'pbs_engines', config=True)
922 924 default_template= CUnicode(u"""#!/bin/sh
923 925 #PBS -V
924 926 #PBS -N ipengine
925 927 %s --cluster-dir $cluster_dir
926 928 """%(' '.join(ipengine_cmd_argv)))
927 929
928 930 def start(self, n, cluster_dir):
929 931 """Start n engines by profile or cluster_dir."""
930 932 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
931 933 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
932 934
933 935 #SGE is very similar to PBS
934 936
935 937 class SGELauncher(PBSLauncher):
936 938 """Sun GridEngine is a PBS clone with slightly different syntax"""
937 939 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
938 940 job_array_template = CUnicode('#$$ -t 1-$n')
939 941 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
940 942 queue_template = CUnicode('#$$ -q $queue')
941 943
942 944 class SGEControllerLauncher(SGELauncher):
943 945 """Launch a controller using SGE."""
944 946
945 947 batch_file_name = CUnicode(u'sge_controller', config=True)
946 948 default_template= CUnicode(u"""#$$ -V
947 949 #$$ -S /bin/sh
948 950 #$$ -N ipcontroller
949 951 %s --log-to-file --cluster-dir $cluster_dir
950 952 """%(' '.join(ipcontroller_cmd_argv)))
951 953
952 954 def start(self, cluster_dir):
953 955 """Start the controller by profile or cluster_dir."""
954 956 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 957 return super(PBSControllerLauncher, self).start(1, cluster_dir)
956 958
957 959 class SGEEngineSetLauncher(SGELauncher):
958 960 """Launch Engines with SGE"""
959 961 batch_file_name = CUnicode(u'sge_engines', config=True)
960 962 default_template = CUnicode("""#$$ -V
961 963 #$$ -S /bin/sh
962 964 #$$ -N ipengine
963 965 %s --cluster-dir $cluster_dir
964 966 """%(' '.join(ipengine_cmd_argv)))
965 967
966 968 def start(self, n, cluster_dir):
967 969 """Start n engines by profile or cluster_dir."""
968 970 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
969 971 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
970 972
971 973
972 974 #-----------------------------------------------------------------------------
973 975 # A launcher for ipcluster itself!
974 976 #-----------------------------------------------------------------------------
975 977
976 978
977 979 class IPClusterLauncher(LocalProcessLauncher):
978 980 """Launch the ipcluster program in an external process."""
979 981
980 982 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
981 983 # Command line arguments to pass to ipcluster.
982 984 ipcluster_args = List(
983 985 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
984 986 ipcluster_subcommand = Str('start')
985 987 ipcluster_n = Int(2)
986 988
987 989 def find_args(self):
988 990 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
989 991 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
990 992
991 993 def start(self):
992 994 self.log.info("Starting ipcluster: %r" % self.args)
993 995 return super(IPClusterLauncher, self).start()
994 996
@@ -1,316 +1,314 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Job and task components for writing .xml files that the Windows HPC Server
5 5 2008 can use to start jobs.
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2009 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 from __future__ import with_statement
20
21 19 import os
22 20 import re
23 21 import uuid
24 22
25 23 from xml.etree import ElementTree as ET
26 24
27 25 from IPython.config.configurable import Configurable
28 26 from IPython.utils.traitlets import (
29 27 Str, Int, List, Instance,
30 28 Enum, Bool, CStr
31 29 )
32 30
33 31 #-----------------------------------------------------------------------------
34 32 # Job and Task classes
35 33 #-----------------------------------------------------------------------------
36 34
37 35
38 36 def as_str(value):
39 37 if isinstance(value, str):
40 38 return value
41 39 elif isinstance(value, bool):
42 40 if value:
43 41 return 'true'
44 42 else:
45 43 return 'false'
46 44 elif isinstance(value, (int, float)):
47 45 return repr(value)
48 46 else:
49 47 return value
50 48
51 49
52 50 def indent(elem, level=0):
53 51 i = "\n" + level*" "
54 52 if len(elem):
55 53 if not elem.text or not elem.text.strip():
56 54 elem.text = i + " "
57 55 if not elem.tail or not elem.tail.strip():
58 56 elem.tail = i
59 57 for elem in elem:
60 58 indent(elem, level+1)
61 59 if not elem.tail or not elem.tail.strip():
62 60 elem.tail = i
63 61 else:
64 62 if level and (not elem.tail or not elem.tail.strip()):
65 63 elem.tail = i
66 64
67 65
68 66 def find_username():
69 67 domain = os.environ.get('USERDOMAIN')
70 68 username = os.environ.get('USERNAME','')
71 69 if domain is None:
72 70 return username
73 71 else:
74 72 return '%s\\%s' % (domain, username)
75 73
76 74
77 75 class WinHPCJob(Configurable):
78 76
79 77 job_id = Str('')
80 78 job_name = Str('MyJob', config=True)
81 79 min_cores = Int(1, config=True)
82 80 max_cores = Int(1, config=True)
83 81 min_sockets = Int(1, config=True)
84 82 max_sockets = Int(1, config=True)
85 83 min_nodes = Int(1, config=True)
86 84 max_nodes = Int(1, config=True)
87 85 unit_type = Str("Core", config=True)
88 86 auto_calculate_min = Bool(True, config=True)
89 87 auto_calculate_max = Bool(True, config=True)
90 88 run_until_canceled = Bool(False, config=True)
91 89 is_exclusive = Bool(False, config=True)
92 90 username = Str(find_username(), config=True)
93 91 job_type = Str('Batch', config=True)
94 92 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
95 93 default_value='Highest', config=True)
96 94 requested_nodes = Str('', config=True)
97 95 project = Str('IPython', config=True)
98 96 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
99 97 version = Str("2.000")
100 98 tasks = List([])
101 99
102 100 @property
103 101 def owner(self):
104 102 return self.username
105 103
106 104 def _write_attr(self, root, attr, key):
107 105 s = as_str(getattr(self, attr, ''))
108 106 if s:
109 107 root.set(key, s)
110 108
111 109 def as_element(self):
112 110 # We have to add _A_ type things to get the right order than
113 111 # the MSFT XML parser expects.
114 112 root = ET.Element('Job')
115 113 self._write_attr(root, 'version', '_A_Version')
116 114 self._write_attr(root, 'job_name', '_B_Name')
117 115 self._write_attr(root, 'unit_type', '_C_UnitType')
118 116 self._write_attr(root, 'min_cores', '_D_MinCores')
119 117 self._write_attr(root, 'max_cores', '_E_MaxCores')
120 118 self._write_attr(root, 'min_sockets', '_F_MinSockets')
121 119 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
122 120 self._write_attr(root, 'min_nodes', '_H_MinNodes')
123 121 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
124 122 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
125 123 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
126 124 self._write_attr(root, 'username', '_L_UserName')
127 125 self._write_attr(root, 'job_type', '_M_JobType')
128 126 self._write_attr(root, 'priority', '_N_Priority')
129 127 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
130 128 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
131 129 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
132 130 self._write_attr(root, 'project', '_R_Project')
133 131 self._write_attr(root, 'owner', '_S_Owner')
134 132 self._write_attr(root, 'xmlns', '_T_xmlns')
135 133 dependencies = ET.SubElement(root, "Dependencies")
136 134 etasks = ET.SubElement(root, "Tasks")
137 135 for t in self.tasks:
138 136 etasks.append(t.as_element())
139 137 return root
140 138
141 139 def tostring(self):
142 140 """Return the string representation of the job description XML."""
143 141 root = self.as_element()
144 142 indent(root)
145 143 txt = ET.tostring(root, encoding="utf-8")
146 144 # Now remove the tokens used to order the attributes.
147 145 txt = re.sub(r'_[A-Z]_','',txt)
148 146 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
149 147 return txt
150 148
151 149 def write(self, filename):
152 150 """Write the XML job description to a file."""
153 151 txt = self.tostring()
154 152 with open(filename, 'w') as f:
155 153 f.write(txt)
156 154
157 155 def add_task(self, task):
158 156 """Add a task to the job.
159 157
160 158 Parameters
161 159 ----------
162 160 task : :class:`WinHPCTask`
163 161 The task object to add.
164 162 """
165 163 self.tasks.append(task)
166 164
167 165
168 166 class WinHPCTask(Configurable):
169 167
170 168 task_id = Str('')
171 169 task_name = Str('')
172 170 version = Str("2.000")
173 171 min_cores = Int(1, config=True)
174 172 max_cores = Int(1, config=True)
175 173 min_sockets = Int(1, config=True)
176 174 max_sockets = Int(1, config=True)
177 175 min_nodes = Int(1, config=True)
178 176 max_nodes = Int(1, config=True)
179 177 unit_type = Str("Core", config=True)
180 178 command_line = CStr('', config=True)
181 179 work_directory = CStr('', config=True)
182 180 is_rerunnaable = Bool(True, config=True)
183 181 std_out_file_path = CStr('', config=True)
184 182 std_err_file_path = CStr('', config=True)
185 183 is_parametric = Bool(False, config=True)
186 184 environment_variables = Instance(dict, args=(), config=True)
187 185
188 186 def _write_attr(self, root, attr, key):
189 187 s = as_str(getattr(self, attr, ''))
190 188 if s:
191 189 root.set(key, s)
192 190
193 191 def as_element(self):
194 192 root = ET.Element('Task')
195 193 self._write_attr(root, 'version', '_A_Version')
196 194 self._write_attr(root, 'task_name', '_B_Name')
197 195 self._write_attr(root, 'min_cores', '_C_MinCores')
198 196 self._write_attr(root, 'max_cores', '_D_MaxCores')
199 197 self._write_attr(root, 'min_sockets', '_E_MinSockets')
200 198 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
201 199 self._write_attr(root, 'min_nodes', '_G_MinNodes')
202 200 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
203 201 self._write_attr(root, 'command_line', '_I_CommandLine')
204 202 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
205 203 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
206 204 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
207 205 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
208 206 self._write_attr(root, 'is_parametric', '_N_IsParametric')
209 207 self._write_attr(root, 'unit_type', '_O_UnitType')
210 208 root.append(self.get_env_vars())
211 209 return root
212 210
213 211 def get_env_vars(self):
214 212 env_vars = ET.Element('EnvironmentVariables')
215 213 for k, v in self.environment_variables.iteritems():
216 214 variable = ET.SubElement(env_vars, "Variable")
217 215 name = ET.SubElement(variable, "Name")
218 216 name.text = k
219 217 value = ET.SubElement(variable, "Value")
220 218 value.text = v
221 219 return env_vars
222 220
223 221
224 222
225 223 # By declaring these, we can configure the controller and engine separately!
226 224
227 225 class IPControllerJob(WinHPCJob):
228 226 job_name = Str('IPController', config=False)
229 227 is_exclusive = Bool(False, config=True)
230 228 username = Str(find_username(), config=True)
231 229 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
232 230 default_value='Highest', config=True)
233 231 requested_nodes = Str('', config=True)
234 232 project = Str('IPython', config=True)
235 233
236 234
237 235 class IPEngineSetJob(WinHPCJob):
238 236 job_name = Str('IPEngineSet', config=False)
239 237 is_exclusive = Bool(False, config=True)
240 238 username = Str(find_username(), config=True)
241 239 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
242 240 default_value='Highest', config=True)
243 241 requested_nodes = Str('', config=True)
244 242 project = Str('IPython', config=True)
245 243
246 244
247 245 class IPControllerTask(WinHPCTask):
248 246
249 247 task_name = Str('IPController', config=True)
250 248 controller_cmd = List(['ipcontroller.exe'], config=True)
251 249 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
252 250 # I don't want these to be configurable
253 251 std_out_file_path = CStr('', config=False)
254 252 std_err_file_path = CStr('', config=False)
255 253 min_cores = Int(1, config=False)
256 254 max_cores = Int(1, config=False)
257 255 min_sockets = Int(1, config=False)
258 256 max_sockets = Int(1, config=False)
259 257 min_nodes = Int(1, config=False)
260 258 max_nodes = Int(1, config=False)
261 259 unit_type = Str("Core", config=False)
262 260 work_directory = CStr('', config=False)
263 261
264 262 def __init__(self, config=None):
265 263 super(IPControllerTask, self).__init__(config=config)
266 264 the_uuid = uuid.uuid1()
267 265 self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid)
268 266 self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid)
269 267
270 268 @property
271 269 def command_line(self):
272 270 return ' '.join(self.controller_cmd + self.controller_args)
273 271
274 272
275 273 class IPEngineTask(WinHPCTask):
276 274
277 275 task_name = Str('IPEngine', config=True)
278 276 engine_cmd = List(['ipengine.exe'], config=True)
279 277 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
280 278 # I don't want these to be configurable
281 279 std_out_file_path = CStr('', config=False)
282 280 std_err_file_path = CStr('', config=False)
283 281 min_cores = Int(1, config=False)
284 282 max_cores = Int(1, config=False)
285 283 min_sockets = Int(1, config=False)
286 284 max_sockets = Int(1, config=False)
287 285 min_nodes = Int(1, config=False)
288 286 max_nodes = Int(1, config=False)
289 287 unit_type = Str("Core", config=False)
290 288 work_directory = CStr('', config=False)
291 289
292 290 def __init__(self, config=None):
293 291 super(IPEngineTask,self).__init__(config=config)
294 292 the_uuid = uuid.uuid1()
295 293 self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid)
296 294 self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid)
297 295
298 296 @property
299 297 def command_line(self):
300 298 return ' '.join(self.engine_cmd + self.engine_args)
301 299
302 300
303 301 # j = WinHPCJob(None)
304 302 # j.job_name = 'IPCluster'
305 303 # j.username = 'GNET\\bgranger'
306 304 # j.requested_nodes = 'GREEN'
307 305 #
308 306 # t = WinHPCTask(None)
309 307 # t.task_name = 'Controller'
310 308 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
311 309 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
312 310 # t.std_out_file_path = 'controller-out.txt'
313 311 # t.std_err_file_path = 'controller-err.txt'
314 312 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
315 313 # j.add_task(t)
316 314
@@ -1,80 +1,107 b''
1 1 """toplevel setup/teardown for parallel tests."""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import tempfile
16 16 import time
17 from subprocess import Popen, PIPE, STDOUT
17 from subprocess import Popen
18 18
19 19 from IPython.utils.path import get_ipython_dir
20 20 from IPython.parallel import Client
21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
22 ipengine_cmd_argv,
23 ipcontroller_cmd_argv,
24 SIGKILL)
21 25
22 processes = []
23 blackhole = tempfile.TemporaryFile()
26 # globals
27 launchers = []
28 blackhole = open(os.devnull, 'w')
29
30 # Launcher class
31 class TestProcessLauncher(LocalProcessLauncher):
32 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
33 def start(self):
34 if self.state == 'before':
35 self.process = Popen(self.args,
36 stdout=blackhole, stderr=blackhole,
37 env=os.environ,
38 cwd=self.work_dir
39 )
40 self.notify_start(self.process.pid)
41 self.poll = self.process.poll
42 else:
43 s = 'The process was already started and has state: %r' % self.state
44 raise ProcessStateError(s)
24 45
25 46 # nose setup/teardown
26 47
27 48 def setup():
28 cp = Popen('ipcontroller --profile iptest -r --log-level 10 --log-to-file --usethreads'.split(), stdout=blackhole, stderr=STDOUT)
29 processes.append(cp)
30 engine_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-engine.json')
31 client_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-client.json')
49 cp = TestProcessLauncher()
50 cp.cmd_and_args = ipcontroller_cmd_argv + \
51 ['--profile', 'iptest', '--log-level', '99', '-r', '--usethreads']
52 cp.start()
53 launchers.append(cp)
54 cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
55 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
56 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
32 57 tic = time.time()
33 58 while not os.path.exists(engine_json) or not os.path.exists(client_json):
34 59 if cp.poll() is not None:
35 60 print cp.poll()
36 61 raise RuntimeError("The test controller failed to start.")
37 62 elif time.time()-tic > 10:
38 63 raise RuntimeError("Timeout waiting for the test controller to start.")
39 64 time.sleep(0.1)
40 65 add_engines(1)
41 66
42 67 def add_engines(n=1, profile='iptest'):
43 68 rc = Client(profile=profile)
44 69 base = len(rc)
45 70 eps = []
46 71 for i in range(n):
47 ep = Popen(['ipengine']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
48 # ep.start()
49 processes.append(ep)
72 ep = TestProcessLauncher()
73 ep.cmd_and_args = ipengine_cmd_argv + ['--profile', profile, '--log-level', '99']
74 ep.start()
75 launchers.append(ep)
50 76 eps.append(ep)
51 77 tic = time.time()
52 78 while len(rc) < base+n:
53 79 if any([ ep.poll() is not None for ep in eps ]):
54 80 raise RuntimeError("A test engine failed to start.")
55 81 elif time.time()-tic > 10:
56 82 raise RuntimeError("Timeout waiting for engines to connect.")
57 83 time.sleep(.1)
58 84 rc.spin()
59 85 rc.close()
60 86 return eps
61 87
62 88 def teardown():
63 89 time.sleep(1)
64 while processes:
65 p = processes.pop()
90 while launchers:
91 p = launchers.pop()
66 92 if p.poll() is None:
67 93 try:
68 p.terminate()
94 p.stop()
69 95 except Exception, e:
70 96 print e
71 97 pass
72 98 if p.poll() is None:
73 99 time.sleep(.25)
74 100 if p.poll() is None:
75 101 try:
76 print 'killing'
77 p.kill()
102 print 'cleaning up test process...'
103 p.signal(SIGKILL)
78 104 except:
79 105 print "couldn't shutdown process: ", p
106 blackhole.close()
80 107
@@ -1,128 +1,129 b''
1 1 """base class for parallel client tests"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 import sys
11 11 import tempfile
12 12 import time
13 13
14 14 from nose import SkipTest
15 15
16 16 import zmq
17 17 from zmq.tests import BaseZMQTestCase
18 18
19 19 from IPython.external.decorator import decorator
20 20
21 21 from IPython.parallel import error
22 22 from IPython.parallel import Client
23 from IPython.parallel.tests import processes,add_engines
23
24 from IPython.parallel.tests import launchers, add_engines
24 25
25 26 # simple tasks for use in apply tests
26 27
27 28 def segfault():
28 29 """this will segfault"""
29 30 import ctypes
30 31 ctypes.memset(-1,0,1)
31 32
32 33 def crash():
33 34 """from stdlib crashers in the test suite"""
34 35 import types
35 36 if sys.platform.startswith('win'):
36 37 import ctypes
37 38 ctypes.windll.kernel32.SetErrorMode(0x0002);
38 39
39 40 co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00',
40 41 (), (), (), '', '', 1, b'')
41 42 exec(co)
42 43
43 44 def wait(n):
44 45 """sleep for a time"""
45 46 import time
46 47 time.sleep(n)
47 48 return n
48 49
49 50 def raiser(eclass):
50 51 """raise an exception"""
51 52 raise eclass()
52 53
53 54 # test decorator for skipping tests when libraries are unavailable
54 55 def skip_without(*names):
55 56 """skip a test if some names are not importable"""
56 57 @decorator
57 58 def skip_without_names(f, *args, **kwargs):
58 59 """decorator to skip tests in the absence of numpy."""
59 60 for name in names:
60 61 try:
61 62 __import__(name)
62 63 except ImportError:
63 64 raise SkipTest
64 65 return f(*args, **kwargs)
65 66 return skip_without_names
66 67
67 68 class ClusterTestCase(BaseZMQTestCase):
68 69
69 70 def add_engines(self, n=1, block=True):
70 71 """add multiple engines to our cluster"""
71 72 self.engines.extend(add_engines(n))
72 73 if block:
73 74 self.wait_on_engines()
74 75
75 76 def wait_on_engines(self, timeout=5):
76 77 """wait for our engines to connect."""
77 78 n = len(self.engines)+self.base_engine_count
78 79 tic = time.time()
79 80 while time.time()-tic < timeout and len(self.client.ids) < n:
80 81 time.sleep(0.1)
81 82
82 83 assert not len(self.client.ids) < n, "waiting for engines timed out"
83 84
84 85 def connect_client(self):
85 86 """connect a client with my Context, and track its sockets for cleanup"""
86 87 c = Client(profile='iptest', context=self.context)
87 88 for name in filter(lambda n:n.endswith('socket'), dir(c)):
88 89 s = getattr(c, name)
89 90 s.setsockopt(zmq.LINGER, 0)
90 91 self.sockets.append(s)
91 92 return c
92 93
93 94 def assertRaisesRemote(self, etype, f, *args, **kwargs):
94 95 try:
95 96 try:
96 97 f(*args, **kwargs)
97 98 except error.CompositeError as e:
98 99 e.raise_exception()
99 100 except error.RemoteError as e:
100 101 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename))
101 102 else:
102 103 self.fail("should have raised a RemoteError")
103 104
104 105 def setUp(self):
105 106 BaseZMQTestCase.setUp(self)
106 107 self.client = self.connect_client()
107 108 # start every test with clean engine namespaces:
108 109 self.client.clear(block=True)
109 110 self.base_engine_count=len(self.client.ids)
110 111 self.engines=[]
111 112
112 113 def tearDown(self):
113 114 # self.client.clear(block=True)
114 115 # close fds:
115 for e in filter(lambda e: e.poll() is not None, processes):
116 processes.remove(e)
116 for e in filter(lambda e: e.poll() is not None, launchers):
117 launchers.remove(e)
117 118
118 119 # allow flushing of incoming messages to prevent crash on socket close
119 120 self.client.wait(timeout=2)
120 121 # time.sleep(2)
121 122 self.client.spin()
122 123 self.client.close()
123 124 BaseZMQTestCase.tearDown(self)
124 125 # this will be redundant when pyzmq merges PR #88
125 126 # self.context.term()
126 127 # print tempfile.TemporaryFile().fileno(),
127 128 # sys.stdout.flush()
128 129 No newline at end of file
@@ -1,69 +1,68 b''
1 1 """Tests for asyncresult.py"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14
15 15 from IPython.parallel.error import TimeoutError
16 16
17 17 from IPython.parallel.tests import add_engines
18 18 from .clienttest import ClusterTestCase
19 19
20 20 def setup():
21 21 add_engines(2)
22 22
23 23 def wait(n):
24 24 import time
25 25 time.sleep(n)
26 26 return n
27 27
28 28 class AsyncResultTest(ClusterTestCase):
29 29
30 30 def test_single_result(self):
31 31 eid = self.client.ids[-1]
32 32 ar = self.client[eid].apply_async(lambda : 42)
33 33 self.assertEquals(ar.get(), 42)
34 34 ar = self.client[[eid]].apply_async(lambda : 42)
35 35 self.assertEquals(ar.get(), [42])
36 36 ar = self.client[-1:].apply_async(lambda : 42)
37 37 self.assertEquals(ar.get(), [42])
38 38
39 39 def test_get_after_done(self):
40 40 ar = self.client[-1].apply_async(lambda : 42)
41 self.assertFalse(ar.ready())
42 41 ar.wait()
43 42 self.assertTrue(ar.ready())
44 43 self.assertEquals(ar.get(), 42)
45 44 self.assertEquals(ar.get(), 42)
46 45
47 46 def test_get_before_done(self):
48 47 ar = self.client[-1].apply_async(wait, 0.1)
49 48 self.assertRaises(TimeoutError, ar.get, 0)
50 49 ar.wait(0)
51 50 self.assertFalse(ar.ready())
52 51 self.assertEquals(ar.get(), 0.1)
53 52
54 53 def test_get_after_error(self):
55 54 ar = self.client[-1].apply_async(lambda : 1/0)
56 55 ar.wait()
57 56 self.assertRaisesRemote(ZeroDivisionError, ar.get)
58 57 self.assertRaisesRemote(ZeroDivisionError, ar.get)
59 58 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
60 59
61 60 def test_get_dict(self):
62 61 n = len(self.client)
63 62 ar = self.client[:].apply_async(lambda : 5)
64 63 self.assertEquals(ar.get(), [5]*n)
65 64 d = ar.get_dict()
66 65 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
67 66 for eid,r in d.iteritems():
68 67 self.assertEquals(r, 5)
69 68
General Comments 0
You need to be logged in to leave comments. Login now