##// END OF EJS Templates
add EvalFormatter for batch system (PBS) launcher templates...
MinRK -
Show More
@@ -1,1067 +1,1070 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import os
21 21 import re
22 22 import stat
23 23
24 24 # signal imports, handling various platforms, versions
25 25
26 26 from signal import SIGINT, SIGTERM
27 27 try:
28 28 from signal import SIGKILL
29 29 except ImportError:
30 30 # Windows
31 31 SIGKILL=SIGTERM
32 32
33 33 try:
34 34 # Windows >= 2.7, 3.2
35 35 from signal import CTRL_C_EVENT as SIGINT
36 36 except ImportError:
37 37 pass
38 38
39 39 from subprocess import Popen, PIPE, STDOUT
40 40 try:
41 41 from subprocess import check_output
42 42 except ImportError:
43 43 # pre-2.7, define check_output with Popen
44 44 def check_output(*args, **kwargs):
45 45 kwargs.update(dict(stdout=PIPE))
46 46 p = Popen(*args, **kwargs)
47 47 out,err = p.communicate()
48 48 return out
49 49
50 50 from zmq.eventloop import ioloop
51 51
52 52 # from IPython.config.configurable import Configurable
53 from IPython.utils.text import EvalFormatter
53 54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
54 55 from IPython.utils.path import get_ipython_module_path
55 56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
56 57
57 58 from IPython.parallel.factory import LoggingFactory
58 59
59 60 from .win32support import forward_read_events
60 61
61 62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
62 63
63 64 WINDOWS = os.name == 'nt'
64 65
65 66 #-----------------------------------------------------------------------------
66 67 # Paths to the kernel apps
67 68 #-----------------------------------------------------------------------------
68 69
69 70
70 71 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
71 72 'IPython.parallel.apps.ipclusterapp'
72 73 ))
73 74
74 75 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
75 76 'IPython.parallel.apps.ipengineapp'
76 77 ))
77 78
78 79 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
79 80 'IPython.parallel.apps.ipcontrollerapp'
80 81 ))
81 82
82 83 #-----------------------------------------------------------------------------
83 84 # Base launchers and errors
84 85 #-----------------------------------------------------------------------------
85 86
86 87
87 88 class LauncherError(Exception):
88 89 pass
89 90
90 91
91 92 class ProcessStateError(LauncherError):
92 93 pass
93 94
94 95
95 96 class UnknownStatus(LauncherError):
96 97 pass
97 98
98 99
99 100 class BaseLauncher(LoggingFactory):
100 101 """An asbtraction for starting, stopping and signaling a process."""
101 102
102 103 # In all of the launchers, the work_dir is where child processes will be
103 104 # run. This will usually be the profile_dir, but may not be. any work_dir
104 105 # passed into the __init__ method will override the config value.
105 106 # This should not be used to set the work_dir for the actual engine
106 107 # and controller. Instead, use their own config files or the
107 108 # controller_args, engine_args attributes of the launchers to add
108 109 # the work_dir option.
109 110 work_dir = Unicode(u'.')
110 111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
111 112
112 113 start_data = Any()
113 114 stop_data = Any()
114 115
115 116 def _loop_default(self):
116 117 return ioloop.IOLoop.instance()
117 118
118 119 def __init__(self, work_dir=u'.', config=None, **kwargs):
119 120 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
120 121 self.state = 'before' # can be before, running, after
121 122 self.stop_callbacks = []
122 123 self.start_data = None
123 124 self.stop_data = None
124 125
125 126 @property
126 127 def args(self):
127 128 """A list of cmd and args that will be used to start the process.
128 129
129 130 This is what is passed to :func:`spawnProcess` and the first element
130 131 will be the process name.
131 132 """
132 133 return self.find_args()
133 134
134 135 def find_args(self):
135 136 """The ``.args`` property calls this to find the args list.
136 137
137 138 Subcommand should implement this to construct the cmd and args.
138 139 """
139 140 raise NotImplementedError('find_args must be implemented in a subclass')
140 141
141 142 @property
142 143 def arg_str(self):
143 144 """The string form of the program arguments."""
144 145 return ' '.join(self.args)
145 146
146 147 @property
147 148 def running(self):
148 149 """Am I running."""
149 150 if self.state == 'running':
150 151 return True
151 152 else:
152 153 return False
153 154
154 155 def start(self):
155 156 """Start the process.
156 157
157 158 This must return a deferred that fires with information about the
158 159 process starting (like a pid, job id, etc.).
159 160 """
160 161 raise NotImplementedError('start must be implemented in a subclass')
161 162
162 163 def stop(self):
163 164 """Stop the process and notify observers of stopping.
164 165
165 166 This must return a deferred that fires with information about the
166 167 processing stopping, like errors that occur while the process is
167 168 attempting to be shut down. This deferred won't fire when the process
168 169 actually stops. To observe the actual process stopping, see
169 170 :func:`observe_stop`.
170 171 """
171 172 raise NotImplementedError('stop must be implemented in a subclass')
172 173
173 174 def on_stop(self, f):
174 175 """Get a deferred that will fire when the process stops.
175 176
176 177 The deferred will fire with data that contains information about
177 178 the exit status of the process.
178 179 """
179 180 if self.state=='after':
180 181 return f(self.stop_data)
181 182 else:
182 183 self.stop_callbacks.append(f)
183 184
184 185 def notify_start(self, data):
185 186 """Call this to trigger startup actions.
186 187
187 188 This logs the process startup and sets the state to 'running'. It is
188 189 a pass-through so it can be used as a callback.
189 190 """
190 191
191 192 self.log.info('Process %r started: %r' % (self.args[0], data))
192 193 self.start_data = data
193 194 self.state = 'running'
194 195 return data
195 196
196 197 def notify_stop(self, data):
197 198 """Call this to trigger process stop actions.
198 199
199 200 This logs the process stopping and sets the state to 'after'. Call
200 201 this to trigger all the deferreds from :func:`observe_stop`."""
201 202
202 203 self.log.info('Process %r stopped: %r' % (self.args[0], data))
203 204 self.stop_data = data
204 205 self.state = 'after'
205 206 for i in range(len(self.stop_callbacks)):
206 207 d = self.stop_callbacks.pop()
207 208 d(data)
208 209 return data
209 210
210 211 def signal(self, sig):
211 212 """Signal the process.
212 213
213 214 Return a semi-meaningless deferred after signaling the process.
214 215
215 216 Parameters
216 217 ----------
217 218 sig : str or int
218 219 'KILL', 'INT', etc., or any signal number
219 220 """
220 221 raise NotImplementedError('signal must be implemented in a subclass')
221 222
222 223
223 224 #-----------------------------------------------------------------------------
224 225 # Local process launchers
225 226 #-----------------------------------------------------------------------------
226 227
227 228
228 229 class LocalProcessLauncher(BaseLauncher):
229 230 """Start and stop an external process in an asynchronous manner.
230 231
231 232 This will launch the external process with a working directory of
232 233 ``self.work_dir``.
233 234 """
234 235
235 236 # This is used to to construct self.args, which is passed to
236 237 # spawnProcess.
237 238 cmd_and_args = List([])
238 239 poll_frequency = Int(100) # in ms
239 240
240 241 def __init__(self, work_dir=u'.', config=None, **kwargs):
241 242 super(LocalProcessLauncher, self).__init__(
242 243 work_dir=work_dir, config=config, **kwargs
243 244 )
244 245 self.process = None
245 246 self.start_deferred = None
246 247 self.poller = None
247 248
248 249 def find_args(self):
249 250 return self.cmd_and_args
250 251
251 252 def start(self):
252 253 if self.state == 'before':
253 254 self.process = Popen(self.args,
254 255 stdout=PIPE,stderr=PIPE,stdin=PIPE,
255 256 env=os.environ,
256 257 cwd=self.work_dir
257 258 )
258 259 if WINDOWS:
259 260 self.stdout = forward_read_events(self.process.stdout)
260 261 self.stderr = forward_read_events(self.process.stderr)
261 262 else:
262 263 self.stdout = self.process.stdout.fileno()
263 264 self.stderr = self.process.stderr.fileno()
264 265 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
265 266 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
266 267 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
267 268 self.poller.start()
268 269 self.notify_start(self.process.pid)
269 270 else:
270 271 s = 'The process was already started and has state: %r' % self.state
271 272 raise ProcessStateError(s)
272 273
273 274 def stop(self):
274 275 return self.interrupt_then_kill()
275 276
276 277 def signal(self, sig):
277 278 if self.state == 'running':
278 279 if WINDOWS and sig != SIGINT:
279 280 # use Windows tree-kill for better child cleanup
280 281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
281 282 else:
282 283 self.process.send_signal(sig)
283 284
284 285 def interrupt_then_kill(self, delay=2.0):
285 286 """Send INT, wait a delay and then send KILL."""
286 287 try:
287 288 self.signal(SIGINT)
288 289 except Exception:
289 290 self.log.debug("interrupt failed")
290 291 pass
291 292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
292 293 self.killer.start()
293 294
294 295 # callbacks, etc:
295 296
296 297 def handle_stdout(self, fd, events):
297 298 if WINDOWS:
298 299 line = self.stdout.recv()
299 300 else:
300 301 line = self.process.stdout.readline()
301 302 # a stopped process will be readable but return empty strings
302 303 if line:
303 304 self.log.info(line[:-1])
304 305 else:
305 306 self.poll()
306 307
307 308 def handle_stderr(self, fd, events):
308 309 if WINDOWS:
309 310 line = self.stderr.recv()
310 311 else:
311 312 line = self.process.stderr.readline()
312 313 # a stopped process will be readable but return empty strings
313 314 if line:
314 315 self.log.error(line[:-1])
315 316 else:
316 317 self.poll()
317 318
318 319 def poll(self):
319 320 status = self.process.poll()
320 321 if status is not None:
321 322 self.poller.stop()
322 323 self.loop.remove_handler(self.stdout)
323 324 self.loop.remove_handler(self.stderr)
324 325 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
325 326 return status
326 327
327 328 class LocalControllerLauncher(LocalProcessLauncher):
328 329 """Launch a controller as a regular external process."""
329 330
330 331 controller_cmd = List(ipcontroller_cmd_argv, config=True,
331 332 help="""Popen command to launch ipcontroller.""")
332 333 # Command line arguments to ipcontroller.
333 334 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
334 335 help="""command-line args to pass to ipcontroller""")
335 336
336 337 def find_args(self):
337 338 return self.controller_cmd + self.controller_args
338 339
339 340 def start(self, profile_dir):
340 341 """Start the controller by profile_dir."""
341 342 self.controller_args.extend(['profile_dir=%s'%profile_dir])
342 343 self.profile_dir = unicode(profile_dir)
343 344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
344 345 return super(LocalControllerLauncher, self).start()
345 346
346 347
347 348 class LocalEngineLauncher(LocalProcessLauncher):
348 349 """Launch a single engine as a regular externall process."""
349 350
350 351 engine_cmd = List(ipengine_cmd_argv, config=True,
351 352 help="""command to launch the Engine.""")
352 353 # Command line arguments for ipengine.
353 354 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
354 355 help="command-line arguments to pass to ipengine"
355 356 )
356 357
357 358 def find_args(self):
358 359 return self.engine_cmd + self.engine_args
359 360
360 361 def start(self, profile_dir):
361 362 """Start the engine by profile_dir."""
362 363 self.engine_args.extend(['profile_dir=%s'%profile_dir])
363 364 self.profile_dir = unicode(profile_dir)
364 365 return super(LocalEngineLauncher, self).start()
365 366
366 367
367 368 class LocalEngineSetLauncher(BaseLauncher):
368 369 """Launch a set of engines as regular external processes."""
369 370
370 371 # Command line arguments for ipengine.
371 372 engine_args = List(
372 373 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
373 374 help="command-line arguments to pass to ipengine"
374 375 )
375 376 # launcher class
376 377 launcher_class = LocalEngineLauncher
377 378
378 379 launchers = Dict()
379 380 stop_data = Dict()
380 381
381 382 def __init__(self, work_dir=u'.', config=None, **kwargs):
382 383 super(LocalEngineSetLauncher, self).__init__(
383 384 work_dir=work_dir, config=config, **kwargs
384 385 )
385 386 self.stop_data = {}
386 387
387 388 def start(self, n, profile_dir):
388 389 """Start n engines by profile or profile_dir."""
389 390 self.profile_dir = unicode(profile_dir)
390 391 dlist = []
391 392 for i in range(n):
392 393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
393 394 # Copy the engine args over to each engine launcher.
394 395 el.engine_args = copy.deepcopy(self.engine_args)
395 396 el.on_stop(self._notice_engine_stopped)
396 397 d = el.start(profile_dir)
397 398 if i==0:
398 399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
399 400 self.launchers[i] = el
400 401 dlist.append(d)
401 402 self.notify_start(dlist)
402 403 # The consumeErrors here could be dangerous
403 404 # dfinal = gatherBoth(dlist, consumeErrors=True)
404 405 # dfinal.addCallback(self.notify_start)
405 406 return dlist
406 407
407 408 def find_args(self):
408 409 return ['engine set']
409 410
410 411 def signal(self, sig):
411 412 dlist = []
412 413 for el in self.launchers.itervalues():
413 414 d = el.signal(sig)
414 415 dlist.append(d)
415 416 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 417 return dlist
417 418
418 419 def interrupt_then_kill(self, delay=1.0):
419 420 dlist = []
420 421 for el in self.launchers.itervalues():
421 422 d = el.interrupt_then_kill(delay)
422 423 dlist.append(d)
423 424 # dfinal = gatherBoth(dlist, consumeErrors=True)
424 425 return dlist
425 426
426 427 def stop(self):
427 428 return self.interrupt_then_kill()
428 429
429 430 def _notice_engine_stopped(self, data):
430 431 pid = data['pid']
431 432 for idx,el in self.launchers.iteritems():
432 433 if el.process.pid == pid:
433 434 break
434 435 self.launchers.pop(idx)
435 436 self.stop_data[idx] = data
436 437 if not self.launchers:
437 438 self.notify_stop(self.stop_data)
438 439
439 440
440 441 #-----------------------------------------------------------------------------
441 442 # MPIExec launchers
442 443 #-----------------------------------------------------------------------------
443 444
444 445
445 446 class MPIExecLauncher(LocalProcessLauncher):
446 447 """Launch an external process using mpiexec."""
447 448
448 449 mpi_cmd = List(['mpiexec'], config=True,
449 450 help="The mpiexec command to use in starting the process."
450 451 )
451 452 mpi_args = List([], config=True,
452 453 help="The command line arguments to pass to mpiexec."
453 454 )
454 455 program = List(['date'], config=True,
455 456 help="The program to start via mpiexec.")
456 457 program_args = List([], config=True,
457 458 help="The command line argument to the program."
458 459 )
459 460 n = Int(1)
460 461
461 462 def find_args(self):
462 463 """Build self.args using all the fields."""
463 464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
464 465 self.program + self.program_args
465 466
466 467 def start(self, n):
467 468 """Start n instances of the program using mpiexec."""
468 469 self.n = n
469 470 return super(MPIExecLauncher, self).start()
470 471
471 472
472 473 class MPIExecControllerLauncher(MPIExecLauncher):
473 474 """Launch a controller using mpiexec."""
474 475
475 476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
476 477 help="Popen command to launch the Contropper"
477 478 )
478 479 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
479 480 help="Command line arguments to pass to ipcontroller."
480 481 )
481 482 n = Int(1)
482 483
483 484 def start(self, profile_dir):
484 485 """Start the controller by profile_dir."""
485 486 self.controller_args.extend(['profile_dir=%s'%profile_dir])
486 487 self.profile_dir = unicode(profile_dir)
487 488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
488 489 return super(MPIExecControllerLauncher, self).start(1)
489 490
490 491 def find_args(self):
491 492 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
492 493 self.controller_cmd + self.controller_args
493 494
494 495
495 496 class MPIExecEngineSetLauncher(MPIExecLauncher):
496 497
497 498 program = List(ipengine_cmd_argv, config=True,
498 499 help="Popen command for ipengine"
499 500 )
500 501 program_args = List(
501 502 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
502 503 help="Command line arguments for ipengine."
503 504 )
504 505 n = Int(1)
505 506
506 507 def start(self, n, profile_dir):
507 508 """Start n engines by profile or profile_dir."""
508 509 self.program_args.extend(['profile_dir=%s'%profile_dir])
509 510 self.profile_dir = unicode(profile_dir)
510 511 self.n = n
511 512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 513 return super(MPIExecEngineSetLauncher, self).start(n)
513 514
514 515 #-----------------------------------------------------------------------------
515 516 # SSH launchers
516 517 #-----------------------------------------------------------------------------
517 518
518 519 # TODO: Get SSH Launcher working again.
519 520
520 521 class SSHLauncher(LocalProcessLauncher):
521 522 """A minimal launcher for ssh.
522 523
523 524 To be useful this will probably have to be extended to use the ``sshx``
524 525 idea for environment variables. There could be other things this needs
525 526 as well.
526 527 """
527 528
528 529 ssh_cmd = List(['ssh'], config=True,
529 530 help="command for starting ssh")
530 531 ssh_args = List(['-tt'], config=True,
531 532 help="args to pass to ssh")
532 533 program = List(['date'], config=True,
533 534 help="Program to launch via ssh")
534 535 program_args = List([], config=True,
535 536 help="args to pass to remote program")
536 537 hostname = Unicode('', config=True,
537 538 help="hostname on which to launch the program")
538 539 user = Unicode('', config=True,
539 540 help="username for ssh")
540 541 location = Unicode('', config=True,
541 542 help="user@hostname location for ssh in one setting")
542 543
543 544 def _hostname_changed(self, name, old, new):
544 545 if self.user:
545 546 self.location = u'%s@%s' % (self.user, new)
546 547 else:
547 548 self.location = new
548 549
549 550 def _user_changed(self, name, old, new):
550 551 self.location = u'%s@%s' % (new, self.hostname)
551 552
552 553 def find_args(self):
553 554 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 555 self.program + self.program_args
555 556
556 557 def start(self, profile_dir, hostname=None, user=None):
557 558 self.profile_dir = unicode(profile_dir)
558 559 if hostname is not None:
559 560 self.hostname = hostname
560 561 if user is not None:
561 562 self.user = user
562 563
563 564 return super(SSHLauncher, self).start()
564 565
565 566 def signal(self, sig):
566 567 if self.state == 'running':
567 568 # send escaped ssh connection-closer
568 569 self.process.stdin.write('~.')
569 570 self.process.stdin.flush()
570 571
571 572
572 573
573 574 class SSHControllerLauncher(SSHLauncher):
574 575
575 576 program = List(ipcontroller_cmd_argv, config=True,
576 577 help="remote ipcontroller command.")
577 578 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
578 579 help="Command line arguments to ipcontroller.")
579 580
580 581
581 582 class SSHEngineLauncher(SSHLauncher):
582 583 program = List(ipengine_cmd_argv, config=True,
583 584 help="remote ipengine command.")
584 585 # Command line arguments for ipengine.
585 586 program_args = List(
586 587 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
587 588 help="Command line arguments to ipengine."
588 589 )
589 590
590 591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
591 592 launcher_class = SSHEngineLauncher
592 593 engines = Dict(config=True,
593 594 help="""dict of engines to launch. This is a dict by hostname of ints,
594 595 corresponding to the number of engines to start on that host.""")
595 596
596 597 def start(self, n, profile_dir):
597 598 """Start engines by profile or profile_dir.
598 599 `n` is ignored, and the `engines` config property is used instead.
599 600 """
600 601
601 602 self.profile_dir = unicode(profile_dir)
602 603 dlist = []
603 604 for host, n in self.engines.iteritems():
604 605 if isinstance(n, (tuple, list)):
605 606 n, args = n
606 607 else:
607 608 args = copy.deepcopy(self.engine_args)
608 609
609 610 if '@' in host:
610 611 user,host = host.split('@',1)
611 612 else:
612 613 user=None
613 614 for i in range(n):
614 615 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
615 616
616 617 # Copy the engine args over to each engine launcher.
617 618 i
618 619 el.program_args = args
619 620 el.on_stop(self._notice_engine_stopped)
620 621 d = el.start(profile_dir, user=user, hostname=host)
621 622 if i==0:
622 623 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
623 624 self.launchers[host+str(i)] = el
624 625 dlist.append(d)
625 626 self.notify_start(dlist)
626 627 return dlist
627 628
628 629
629 630
630 631 #-----------------------------------------------------------------------------
631 632 # Windows HPC Server 2008 scheduler launchers
632 633 #-----------------------------------------------------------------------------
633 634
634 635
635 636 # This is only used on Windows.
636 637 def find_job_cmd():
637 638 if WINDOWS:
638 639 try:
639 640 return find_cmd('job')
640 641 except (FindCmdError, ImportError):
641 642 # ImportError will be raised if win32api is not installed
642 643 return 'job'
643 644 else:
644 645 return 'job'
645 646
646 647
647 648 class WindowsHPCLauncher(BaseLauncher):
648 649
649 650 job_id_regexp = Unicode(r'\d+', config=True,
650 651 help="""A regular expression used to get the job id from the output of the
651 652 submit_command. """
652 653 )
653 654 job_file_name = Unicode(u'ipython_job.xml', config=True,
654 655 help="The filename of the instantiated job script.")
655 656 # The full path to the instantiated job script. This gets made dynamically
656 657 # by combining the work_dir with the job_file_name.
657 658 job_file = Unicode(u'')
658 659 scheduler = Unicode('', config=True,
659 660 help="The hostname of the scheduler to submit the job to.")
660 661 job_cmd = Unicode(find_job_cmd(), config=True,
661 662 help="The command for submitting jobs.")
662 663
663 664 def __init__(self, work_dir=u'.', config=None, **kwargs):
664 665 super(WindowsHPCLauncher, self).__init__(
665 666 work_dir=work_dir, config=config, **kwargs
666 667 )
667 668
668 669 @property
669 670 def job_file(self):
670 671 return os.path.join(self.work_dir, self.job_file_name)
671 672
672 673 def write_job_file(self, n):
673 674 raise NotImplementedError("Implement write_job_file in a subclass.")
674 675
675 676 def find_args(self):
676 677 return [u'job.exe']
677 678
678 679 def parse_job_id(self, output):
679 680 """Take the output of the submit command and return the job id."""
680 681 m = re.search(self.job_id_regexp, output)
681 682 if m is not None:
682 683 job_id = m.group()
683 684 else:
684 685 raise LauncherError("Job id couldn't be determined: %s" % output)
685 686 self.job_id = job_id
686 687 self.log.info('Job started with job id: %r' % job_id)
687 688 return job_id
688 689
689 690 def start(self, n):
690 691 """Start n copies of the process using the Win HPC job scheduler."""
691 692 self.write_job_file(n)
692 693 args = [
693 694 'submit',
694 695 '/jobfile:%s' % self.job_file,
695 696 '/scheduler:%s' % self.scheduler
696 697 ]
697 698 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
698 699 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
699 700 output = check_output([self.job_cmd]+args,
700 701 env=os.environ,
701 702 cwd=self.work_dir,
702 703 stderr=STDOUT
703 704 )
704 705 job_id = self.parse_job_id(output)
705 706 self.notify_start(job_id)
706 707 return job_id
707 708
708 709 def stop(self):
709 710 args = [
710 711 'cancel',
711 712 self.job_id,
712 713 '/scheduler:%s' % self.scheduler
713 714 ]
714 715 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
715 716 try:
716 717 output = check_output([self.job_cmd]+args,
717 718 env=os.environ,
718 719 cwd=self.work_dir,
719 720 stderr=STDOUT
720 721 )
721 722 except:
722 723 output = 'The job already appears to be stoppped: %r' % self.job_id
723 724 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
724 725 return output
725 726
726 727
727 728 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
728 729
729 730 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
730 731 help="WinHPC xml job file.")
731 732 extra_args = List([], config=False,
732 733 help="extra args to pass to ipcontroller")
733 734
734 735 def write_job_file(self, n):
735 736 job = IPControllerJob(config=self.config)
736 737
737 738 t = IPControllerTask(config=self.config)
738 739 # The tasks work directory is *not* the actual work directory of
739 740 # the controller. It is used as the base path for the stdout/stderr
740 741 # files that the scheduler redirects to.
741 742 t.work_directory = self.profile_dir
742 743 # Add the profile_dir and from self.start().
743 744 t.controller_args.extend(self.extra_args)
744 745 job.add_task(t)
745 746
746 747 self.log.info("Writing job description file: %s" % self.job_file)
747 748 job.write(self.job_file)
748 749
749 750 @property
750 751 def job_file(self):
751 752 return os.path.join(self.profile_dir, self.job_file_name)
752 753
753 754 def start(self, profile_dir):
754 755 """Start the controller by profile_dir."""
755 756 self.extra_args = ['profile_dir=%s'%profile_dir]
756 757 self.profile_dir = unicode(profile_dir)
757 758 return super(WindowsHPCControllerLauncher, self).start(1)
758 759
759 760
760 761 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
761 762
762 763 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
763 764 help="jobfile for ipengines job")
764 765 extra_args = List([], config=False,
765 766 help="extra args to pas to ipengine")
766 767
767 768 def write_job_file(self, n):
768 769 job = IPEngineSetJob(config=self.config)
769 770
770 771 for i in range(n):
771 772 t = IPEngineTask(config=self.config)
772 773 # The tasks work directory is *not* the actual work directory of
773 774 # the engine. It is used as the base path for the stdout/stderr
774 775 # files that the scheduler redirects to.
775 776 t.work_directory = self.profile_dir
776 777 # Add the profile_dir and from self.start().
777 778 t.engine_args.extend(self.extra_args)
778 779 job.add_task(t)
779 780
780 781 self.log.info("Writing job description file: %s" % self.job_file)
781 782 job.write(self.job_file)
782 783
783 784 @property
784 785 def job_file(self):
785 786 return os.path.join(self.profile_dir, self.job_file_name)
786 787
787 788 def start(self, n, profile_dir):
788 789 """Start the controller by profile_dir."""
789 790 self.extra_args = ['profile_dir=%s'%profile_dir]
790 791 self.profile_dir = unicode(profile_dir)
791 792 return super(WindowsHPCEngineSetLauncher, self).start(n)
792 793
793 794
794 795 #-----------------------------------------------------------------------------
795 796 # Batch (PBS) system launchers
796 797 #-----------------------------------------------------------------------------
797 798
798 799 class BatchSystemLauncher(BaseLauncher):
799 800 """Launch an external process using a batch system.
800 801
801 802 This class is designed to work with UNIX batch systems like PBS, LSF,
802 803 GridEngine, etc. The overall model is that there are different commands
803 804 like qsub, qdel, etc. that handle the starting and stopping of the process.
804 805
805 806 This class also has the notion of a batch script. The ``batch_template``
806 807 attribute can be set to a string that is a template for the batch script.
807 808 This template is instantiated using string formatting. Thus the template can
808 809 use {n} fot the number of instances. Subclasses can add additional variables
809 810 to the template dict.
810 811 """
811 812
812 813 # Subclasses must fill these in. See PBSEngineSet
813 814 submit_command = List([''], config=True,
814 815 help="The name of the command line program used to submit jobs.")
815 816 delete_command = List([''], config=True,
816 817 help="The name of the command line program used to delete jobs.")
817 818 job_id_regexp = Unicode('', config=True,
818 819 help="""A regular expression used to get the job id from the output of the
819 820 submit_command.""")
820 821 batch_template = Unicode('', config=True,
821 822 help="The string that is the batch script template itself.")
822 823 batch_template_file = Unicode(u'', config=True,
823 824 help="The file that contains the batch template.")
824 825 batch_file_name = Unicode(u'batch_script', config=True,
825 826 help="The filename of the instantiated batch script.")
826 827 queue = Unicode(u'', config=True,
827 828 help="The PBS Queue.")
828 829
829 830 # not configurable, override in subclasses
830 831 # PBS Job Array regex
831 832 job_array_regexp = Unicode('')
832 833 job_array_template = Unicode('')
833 834 # PBS Queue regex
834 835 queue_regexp = Unicode('')
835 836 queue_template = Unicode('')
836 837 # The default batch template, override in subclasses
837 838 default_template = Unicode('')
838 839 # The full path to the instantiated batch script.
839 840 batch_file = Unicode(u'')
840 841 # the format dict used with batch_template:
841 842 context = Dict()
843 # the Formatter instance for rendering the templates:
844 formatter = Instance(EvalFormatter, (), {})
842 845
843 846
844 847 def find_args(self):
845 848 return self.submit_command + [self.batch_file]
846 849
847 850 def __init__(self, work_dir=u'.', config=None, **kwargs):
848 851 super(BatchSystemLauncher, self).__init__(
849 852 work_dir=work_dir, config=config, **kwargs
850 853 )
851 854 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
852 855
853 856 def parse_job_id(self, output):
854 857 """Take the output of the submit command and return the job id."""
855 858 m = re.search(self.job_id_regexp, output)
856 859 if m is not None:
857 860 job_id = m.group()
858 861 else:
859 862 raise LauncherError("Job id couldn't be determined: %s" % output)
860 863 self.job_id = job_id
861 864 self.log.info('Job submitted with job id: %r' % job_id)
862 865 return job_id
863 866
864 867 def write_batch_script(self, n):
865 868 """Instantiate and write the batch script to the work_dir."""
866 869 self.context['n'] = n
867 870 self.context['queue'] = self.queue
868 871 # first priority is batch_template if set
869 872 if self.batch_template_file and not self.batch_template:
870 873 # second priority is batch_template_file
871 874 with open(self.batch_template_file) as f:
872 875 self.batch_template = f.read()
873 876 if not self.batch_template:
874 877 # third (last) priority is default_template
875 878 self.batch_template = self.default_template
876 879
877 880 regex = re.compile(self.job_array_regexp)
878 881 # print regex.search(self.batch_template)
879 882 if not regex.search(self.batch_template):
880 883 self.log.info("adding job array settings to batch script")
881 884 firstline, rest = self.batch_template.split('\n',1)
882 885 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
883 886
884 887 regex = re.compile(self.queue_regexp)
885 888 # print regex.search(self.batch_template)
886 889 if self.queue and not regex.search(self.batch_template):
887 890 self.log.info("adding PBS queue settings to batch script")
888 891 firstline, rest = self.batch_template.split('\n',1)
889 892 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
890 893
891 script_as_string = self.batch_template.format(**self.context)
894 script_as_string = self.formatter.format(self.batch_template, **self.context)
892 895 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
893 896
894 897 with open(self.batch_file, 'w') as f:
895 898 f.write(script_as_string)
896 899 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
897 900
898 901 def start(self, n, profile_dir):
899 902 """Start n copies of the process using a batch system."""
900 903 # Here we save profile_dir in the context so they
901 904 # can be used in the batch script template as {profile_dir}
902 905 self.context['profile_dir'] = profile_dir
903 906 self.profile_dir = unicode(profile_dir)
904 907 self.write_batch_script(n)
905 908 output = check_output(self.args, env=os.environ)
906 909
907 910 job_id = self.parse_job_id(output)
908 911 self.notify_start(job_id)
909 912 return job_id
910 913
911 914 def stop(self):
912 915 output = check_output(self.delete_command+[self.job_id], env=os.environ)
913 916 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
914 917 return output
915 918
916 919
917 920 class PBSLauncher(BatchSystemLauncher):
918 921 """A BatchSystemLauncher subclass for PBS."""
919 922
920 923 submit_command = List(['qsub'], config=True,
921 924 help="The PBS submit command ['qsub']")
922 925 delete_command = List(['qdel'], config=True,
923 926 help="The PBS delete command ['qsub']")
924 927 job_id_regexp = Unicode(r'\d+', config=True,
925 928 help="Regular expresion for identifying the job ID [r'\d+']")
926 929
927 930 batch_file = Unicode(u'')
928 931 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
929 932 job_array_template = Unicode('#PBS -t 1-{n}')
930 933 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
931 934 queue_template = Unicode('#PBS -q {queue}')
932 935
933 936
934 937 class PBSControllerLauncher(PBSLauncher):
935 938 """Launch a controller using PBS."""
936 939
937 940 batch_file_name = Unicode(u'pbs_controller', config=True,
938 941 help="batch file name for the controller job.")
939 942 default_template= Unicode("""#!/bin/sh
940 943 #PBS -V
941 944 #PBS -N ipcontroller
942 945 %s --log-to-file profile_dir={profile_dir}
943 946 """%(' '.join(ipcontroller_cmd_argv)))
944 947
945 948 def start(self, profile_dir):
946 949 """Start the controller by profile or profile_dir."""
947 950 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
948 951 return super(PBSControllerLauncher, self).start(1, profile_dir)
949 952
950 953
951 954 class PBSEngineSetLauncher(PBSLauncher):
952 955 """Launch Engines using PBS"""
953 956 batch_file_name = Unicode(u'pbs_engines', config=True,
954 957 help="batch file name for the engine(s) job.")
955 958 default_template= Unicode(u"""#!/bin/sh
956 959 #PBS -V
957 960 #PBS -N ipengine
958 961 %s profile_dir={profile_dir}
959 962 """%(' '.join(ipengine_cmd_argv)))
960 963
961 964 def start(self, n, profile_dir):
962 965 """Start n engines by profile or profile_dir."""
963 966 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
964 967 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
965 968
966 969 #SGE is very similar to PBS
967 970
968 971 class SGELauncher(PBSLauncher):
969 972 """Sun GridEngine is a PBS clone with slightly different syntax"""
970 973 job_array_regexp = Unicode('#\$\W+\-t')
971 974 job_array_template = Unicode('#$ -t 1-{n}')
972 975 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
973 976 queue_template = Unicode('#$ -q $queue')
974 977
975 978 class SGEControllerLauncher(SGELauncher):
976 979 """Launch a controller using SGE."""
977 980
978 981 batch_file_name = Unicode(u'sge_controller', config=True,
979 982 help="batch file name for the ipontroller job.")
980 983 default_template= Unicode(u"""#$ -V
981 984 #$ -S /bin/sh
982 985 #$ -N ipcontroller
983 986 %s --log-to-file profile_dir={profile_dir}
984 987 """%(' '.join(ipcontroller_cmd_argv)))
985 988
986 989 def start(self, profile_dir):
987 990 """Start the controller by profile or profile_dir."""
988 991 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
989 992 return super(SGEControllerLauncher, self).start(1, profile_dir)
990 993
991 994 class SGEEngineSetLauncher(SGELauncher):
992 995 """Launch Engines with SGE"""
993 996 batch_file_name = Unicode(u'sge_engines', config=True,
994 997 help="batch file name for the engine(s) job.")
995 998 default_template = Unicode("""#$ -V
996 999 #$ -S /bin/sh
997 1000 #$ -N ipengine
998 1001 %s profile_dir={profile_dir}
999 1002 """%(' '.join(ipengine_cmd_argv)))
1000 1003
1001 1004 def start(self, n, profile_dir):
1002 1005 """Start n engines by profile or profile_dir."""
1003 1006 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1004 1007 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1005 1008
1006 1009
1007 1010 #-----------------------------------------------------------------------------
1008 1011 # A launcher for ipcluster itself!
1009 1012 #-----------------------------------------------------------------------------
1010 1013
1011 1014
1012 1015 class IPClusterLauncher(LocalProcessLauncher):
1013 1016 """Launch the ipcluster program in an external process."""
1014 1017
1015 1018 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1016 1019 help="Popen command for ipcluster")
1017 1020 ipcluster_args = List(
1018 1021 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1019 1022 help="Command line arguments to pass to ipcluster.")
1020 1023 ipcluster_subcommand = Unicode('start')
1021 1024 ipcluster_n = Int(2)
1022 1025
1023 1026 def find_args(self):
1024 1027 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1025 1028 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1026 1029
1027 1030 def start(self):
1028 1031 self.log.info("Starting ipcluster: %r" % self.args)
1029 1032 return super(IPClusterLauncher, self).start()
1030 1033
1031 1034 #-----------------------------------------------------------------------------
1032 1035 # Collections of launchers
1033 1036 #-----------------------------------------------------------------------------
1034 1037
1035 1038 local_launchers = [
1036 1039 LocalControllerLauncher,
1037 1040 LocalEngineLauncher,
1038 1041 LocalEngineSetLauncher,
1039 1042 ]
1040 1043 mpi_launchers = [
1041 1044 MPIExecLauncher,
1042 1045 MPIExecControllerLauncher,
1043 1046 MPIExecEngineSetLauncher,
1044 1047 ]
1045 1048 ssh_launchers = [
1046 1049 SSHLauncher,
1047 1050 SSHControllerLauncher,
1048 1051 SSHEngineLauncher,
1049 1052 SSHEngineSetLauncher,
1050 1053 ]
1051 1054 winhpc_launchers = [
1052 1055 WindowsHPCLauncher,
1053 1056 WindowsHPCControllerLauncher,
1054 1057 WindowsHPCEngineSetLauncher,
1055 1058 ]
1056 1059 pbs_launchers = [
1057 1060 PBSLauncher,
1058 1061 PBSControllerLauncher,
1059 1062 PBSEngineSetLauncher,
1060 1063 ]
1061 1064 sge_launchers = [
1062 1065 SGELauncher,
1063 1066 SGEControllerLauncher,
1064 1067 SGEEngineSetLauncher,
1065 1068 ]
1066 1069 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1067 1070 + pbs_launchers + sge_launchers
@@ -1,521 +1,560 b''
1 1 # encoding: utf-8
2 2 """
3 3 Utilities for working with strings and text.
4 4 """
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008-2009 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import __main__
18 18
19 19 import os
20 20 import re
21 21 import shutil
22 from string import Formatter
22 23
23 24 from IPython.external.path import path
24 25
25 26 from IPython.utils.io import nlprint
26 27 from IPython.utils.data import flatten
27 28
28 29 #-----------------------------------------------------------------------------
29 30 # Code
30 31 #-----------------------------------------------------------------------------
31 32
32 33
33 34 def unquote_ends(istr):
34 35 """Remove a single pair of quotes from the endpoints of a string."""
35 36
36 37 if not istr:
37 38 return istr
38 39 if (istr[0]=="'" and istr[-1]=="'") or \
39 40 (istr[0]=='"' and istr[-1]=='"'):
40 41 return istr[1:-1]
41 42 else:
42 43 return istr
43 44
44 45
45 46 class LSString(str):
46 47 """String derivative with a special access attributes.
47 48
48 49 These are normal strings, but with the special attributes:
49 50
50 51 .l (or .list) : value as list (split on newlines).
51 52 .n (or .nlstr): original value (the string itself).
52 53 .s (or .spstr): value as whitespace-separated string.
53 54 .p (or .paths): list of path objects
54 55
55 56 Any values which require transformations are computed only once and
56 57 cached.
57 58
58 59 Such strings are very useful to efficiently interact with the shell, which
59 60 typically only understands whitespace-separated options for commands."""
60 61
61 62 def get_list(self):
62 63 try:
63 64 return self.__list
64 65 except AttributeError:
65 66 self.__list = self.split('\n')
66 67 return self.__list
67 68
68 69 l = list = property(get_list)
69 70
70 71 def get_spstr(self):
71 72 try:
72 73 return self.__spstr
73 74 except AttributeError:
74 75 self.__spstr = self.replace('\n',' ')
75 76 return self.__spstr
76 77
77 78 s = spstr = property(get_spstr)
78 79
79 80 def get_nlstr(self):
80 81 return self
81 82
82 83 n = nlstr = property(get_nlstr)
83 84
84 85 def get_paths(self):
85 86 try:
86 87 return self.__paths
87 88 except AttributeError:
88 89 self.__paths = [path(p) for p in self.split('\n') if os.path.exists(p)]
89 90 return self.__paths
90 91
91 92 p = paths = property(get_paths)
92 93
93 94 # FIXME: We need to reimplement type specific displayhook and then add this
94 95 # back as a custom printer. This should also be moved outside utils into the
95 96 # core.
96 97
97 98 # def print_lsstring(arg):
98 99 # """ Prettier (non-repr-like) and more informative printer for LSString """
99 100 # print "LSString (.p, .n, .l, .s available). Value:"
100 101 # print arg
101 102 #
102 103 #
103 104 # print_lsstring = result_display.when_type(LSString)(print_lsstring)
104 105
105 106
106 107 class SList(list):
107 108 """List derivative with a special access attributes.
108 109
109 110 These are normal lists, but with the special attributes:
110 111
111 112 .l (or .list) : value as list (the list itself).
112 113 .n (or .nlstr): value as a string, joined on newlines.
113 114 .s (or .spstr): value as a string, joined on spaces.
114 115 .p (or .paths): list of path objects
115 116
116 117 Any values which require transformations are computed only once and
117 118 cached."""
118 119
119 120 def get_list(self):
120 121 return self
121 122
122 123 l = list = property(get_list)
123 124
124 125 def get_spstr(self):
125 126 try:
126 127 return self.__spstr
127 128 except AttributeError:
128 129 self.__spstr = ' '.join(self)
129 130 return self.__spstr
130 131
131 132 s = spstr = property(get_spstr)
132 133
133 134 def get_nlstr(self):
134 135 try:
135 136 return self.__nlstr
136 137 except AttributeError:
137 138 self.__nlstr = '\n'.join(self)
138 139 return self.__nlstr
139 140
140 141 n = nlstr = property(get_nlstr)
141 142
142 143 def get_paths(self):
143 144 try:
144 145 return self.__paths
145 146 except AttributeError:
146 147 self.__paths = [path(p) for p in self if os.path.exists(p)]
147 148 return self.__paths
148 149
149 150 p = paths = property(get_paths)
150 151
151 152 def grep(self, pattern, prune = False, field = None):
152 153 """ Return all strings matching 'pattern' (a regex or callable)
153 154
154 155 This is case-insensitive. If prune is true, return all items
155 156 NOT matching the pattern.
156 157
157 158 If field is specified, the match must occur in the specified
158 159 whitespace-separated field.
159 160
160 161 Examples::
161 162
162 163 a.grep( lambda x: x.startswith('C') )
163 164 a.grep('Cha.*log', prune=1)
164 165 a.grep('chm', field=-1)
165 166 """
166 167
167 168 def match_target(s):
168 169 if field is None:
169 170 return s
170 171 parts = s.split()
171 172 try:
172 173 tgt = parts[field]
173 174 return tgt
174 175 except IndexError:
175 176 return ""
176 177
177 178 if isinstance(pattern, basestring):
178 179 pred = lambda x : re.search(pattern, x, re.IGNORECASE)
179 180 else:
180 181 pred = pattern
181 182 if not prune:
182 183 return SList([el for el in self if pred(match_target(el))])
183 184 else:
184 185 return SList([el for el in self if not pred(match_target(el))])
185 186
186 187 def fields(self, *fields):
187 188 """ Collect whitespace-separated fields from string list
188 189
189 190 Allows quick awk-like usage of string lists.
190 191
191 192 Example data (in var a, created by 'a = !ls -l')::
192 193 -rwxrwxrwx 1 ville None 18 Dec 14 2006 ChangeLog
193 194 drwxrwxrwx+ 6 ville None 0 Oct 24 18:05 IPython
194 195
195 196 a.fields(0) is ['-rwxrwxrwx', 'drwxrwxrwx+']
196 197 a.fields(1,0) is ['1 -rwxrwxrwx', '6 drwxrwxrwx+']
197 198 (note the joining by space).
198 199 a.fields(-1) is ['ChangeLog', 'IPython']
199 200
200 201 IndexErrors are ignored.
201 202
202 203 Without args, fields() just split()'s the strings.
203 204 """
204 205 if len(fields) == 0:
205 206 return [el.split() for el in self]
206 207
207 208 res = SList()
208 209 for el in [f.split() for f in self]:
209 210 lineparts = []
210 211
211 212 for fd in fields:
212 213 try:
213 214 lineparts.append(el[fd])
214 215 except IndexError:
215 216 pass
216 217 if lineparts:
217 218 res.append(" ".join(lineparts))
218 219
219 220 return res
220 221
221 222 def sort(self,field= None, nums = False):
222 223 """ sort by specified fields (see fields())
223 224
224 225 Example::
225 226 a.sort(1, nums = True)
226 227
227 228 Sorts a by second field, in numerical order (so that 21 > 3)
228 229
229 230 """
230 231
231 232 #decorate, sort, undecorate
232 233 if field is not None:
233 234 dsu = [[SList([line]).fields(field), line] for line in self]
234 235 else:
235 236 dsu = [[line, line] for line in self]
236 237 if nums:
237 238 for i in range(len(dsu)):
238 239 numstr = "".join([ch for ch in dsu[i][0] if ch.isdigit()])
239 240 try:
240 241 n = int(numstr)
241 242 except ValueError:
242 243 n = 0;
243 244 dsu[i][0] = n
244 245
245 246
246 247 dsu.sort()
247 248 return SList([t[1] for t in dsu])
248 249
249 250
250 251 # FIXME: We need to reimplement type specific displayhook and then add this
251 252 # back as a custom printer. This should also be moved outside utils into the
252 253 # core.
253 254
254 255 # def print_slist(arg):
255 256 # """ Prettier (non-repr-like) and more informative printer for SList """
256 257 # print "SList (.p, .n, .l, .s, .grep(), .fields(), sort() available):"
257 258 # if hasattr(arg, 'hideonce') and arg.hideonce:
258 259 # arg.hideonce = False
259 260 # return
260 261 #
261 262 # nlprint(arg)
262 263 #
263 264 # print_slist = result_display.when_type(SList)(print_slist)
264 265
265 266
266 267 def esc_quotes(strng):
267 268 """Return the input string with single and double quotes escaped out"""
268 269
269 270 return strng.replace('"','\\"').replace("'","\\'")
270 271
271 272
272 273 def make_quoted_expr(s):
273 274 """Return string s in appropriate quotes, using raw string if possible.
274 275
275 276 XXX - example removed because it caused encoding errors in documentation
276 277 generation. We need a new example that doesn't contain invalid chars.
277 278
278 279 Note the use of raw string and padding at the end to allow trailing
279 280 backslash.
280 281 """
281 282
282 283 tail = ''
283 284 tailpadding = ''
284 285 raw = ''
285 286 ucode = 'u'
286 287 if "\\" in s:
287 288 raw = 'r'
288 289 if s.endswith('\\'):
289 290 tail = '[:-1]'
290 291 tailpadding = '_'
291 292 if '"' not in s:
292 293 quote = '"'
293 294 elif "'" not in s:
294 295 quote = "'"
295 296 elif '"""' not in s and not s.endswith('"'):
296 297 quote = '"""'
297 298 elif "'''" not in s and not s.endswith("'"):
298 299 quote = "'''"
299 300 else:
300 301 # give up, backslash-escaped string will do
301 302 return '"%s"' % esc_quotes(s)
302 303 res = ucode + raw + quote + s + tailpadding + quote + tail
303 304 return res
304 305
305 306
306 307 def qw(words,flat=0,sep=None,maxsplit=-1):
307 308 """Similar to Perl's qw() operator, but with some more options.
308 309
309 310 qw(words,flat=0,sep=' ',maxsplit=-1) -> words.split(sep,maxsplit)
310 311
311 312 words can also be a list itself, and with flat=1, the output will be
312 313 recursively flattened.
313 314
314 315 Examples:
315 316
316 317 >>> qw('1 2')
317 318 ['1', '2']
318 319
319 320 >>> qw(['a b','1 2',['m n','p q']])
320 321 [['a', 'b'], ['1', '2'], [['m', 'n'], ['p', 'q']]]
321 322
322 323 >>> qw(['a b','1 2',['m n','p q']],flat=1)
323 324 ['a', 'b', '1', '2', 'm', 'n', 'p', 'q']
324 325 """
325 326
326 327 if isinstance(words, basestring):
327 328 return [word.strip() for word in words.split(sep,maxsplit)
328 329 if word and not word.isspace() ]
329 330 if flat:
330 331 return flatten(map(qw,words,[1]*len(words)))
331 332 return map(qw,words)
332 333
333 334
334 335 def qwflat(words,sep=None,maxsplit=-1):
335 336 """Calls qw(words) in flat mode. It's just a convenient shorthand."""
336 337 return qw(words,1,sep,maxsplit)
337 338
338 339
339 340 def qw_lol(indata):
340 341 """qw_lol('a b') -> [['a','b']],
341 342 otherwise it's just a call to qw().
342 343
343 344 We need this to make sure the modules_some keys *always* end up as a
344 345 list of lists."""
345 346
346 347 if isinstance(indata, basestring):
347 348 return [qw(indata)]
348 349 else:
349 350 return qw(indata)
350 351
351 352
352 353 def grep(pat,list,case=1):
353 354 """Simple minded grep-like function.
354 355 grep(pat,list) returns occurrences of pat in list, None on failure.
355 356
356 357 It only does simple string matching, with no support for regexps. Use the
357 358 option case=0 for case-insensitive matching."""
358 359
359 360 # This is pretty crude. At least it should implement copying only references
360 361 # to the original data in case it's big. Now it copies the data for output.
361 362 out=[]
362 363 if case:
363 364 for term in list:
364 365 if term.find(pat)>-1: out.append(term)
365 366 else:
366 367 lpat=pat.lower()
367 368 for term in list:
368 369 if term.lower().find(lpat)>-1: out.append(term)
369 370
370 371 if len(out): return out
371 372 else: return None
372 373
373 374
374 375 def dgrep(pat,*opts):
375 376 """Return grep() on dir()+dir(__builtins__).
376 377
377 378 A very common use of grep() when working interactively."""
378 379
379 380 return grep(pat,dir(__main__)+dir(__main__.__builtins__),*opts)
380 381
381 382
382 383 def idgrep(pat):
383 384 """Case-insensitive dgrep()"""
384 385
385 386 return dgrep(pat,0)
386 387
387 388
388 389 def igrep(pat,list):
389 390 """Synonym for case-insensitive grep."""
390 391
391 392 return grep(pat,list,case=0)
392 393
393 394
394 395 def indent(instr,nspaces=4, ntabs=0, flatten=False):
395 396 """Indent a string a given number of spaces or tabstops.
396 397
397 398 indent(str,nspaces=4,ntabs=0) -> indent str by ntabs+nspaces.
398 399
399 400 Parameters
400 401 ----------
401 402
402 403 instr : basestring
403 404 The string to be indented.
404 405 nspaces : int (default: 4)
405 406 The number of spaces to be indented.
406 407 ntabs : int (default: 0)
407 408 The number of tabs to be indented.
408 409 flatten : bool (default: False)
409 410 Whether to scrub existing indentation. If True, all lines will be
410 411 aligned to the same indentation. If False, existing indentation will
411 412 be strictly increased.
412 413
413 414 Returns
414 415 -------
415 416
416 417 str|unicode : string indented by ntabs and nspaces.
417 418
418 419 """
419 420 if instr is None:
420 421 return
421 422 ind = '\t'*ntabs+' '*nspaces
422 423 if flatten:
423 424 pat = re.compile(r'^\s*', re.MULTILINE)
424 425 else:
425 426 pat = re.compile(r'^', re.MULTILINE)
426 427 outstr = re.sub(pat, ind, instr)
427 428 if outstr.endswith(os.linesep+ind):
428 429 return outstr[:-len(ind)]
429 430 else:
430 431 return outstr
431 432
432 433 def native_line_ends(filename,backup=1):
433 434 """Convert (in-place) a file to line-ends native to the current OS.
434 435
435 436 If the optional backup argument is given as false, no backup of the
436 437 original file is left. """
437 438
438 439 backup_suffixes = {'posix':'~','dos':'.bak','nt':'.bak','mac':'.bak'}
439 440
440 441 bak_filename = filename + backup_suffixes[os.name]
441 442
442 443 original = open(filename).read()
443 444 shutil.copy2(filename,bak_filename)
444 445 try:
445 446 new = open(filename,'wb')
446 447 new.write(os.linesep.join(original.splitlines()))
447 448 new.write(os.linesep) # ALWAYS put an eol at the end of the file
448 449 new.close()
449 450 except:
450 451 os.rename(bak_filename,filename)
451 452 if not backup:
452 453 try:
453 454 os.remove(bak_filename)
454 455 except:
455 456 pass
456 457
457 458
458 459 def list_strings(arg):
459 460 """Always return a list of strings, given a string or list of strings
460 461 as input.
461 462
462 463 :Examples:
463 464
464 465 In [7]: list_strings('A single string')
465 466 Out[7]: ['A single string']
466 467
467 468 In [8]: list_strings(['A single string in a list'])
468 469 Out[8]: ['A single string in a list']
469 470
470 471 In [9]: list_strings(['A','list','of','strings'])
471 472 Out[9]: ['A', 'list', 'of', 'strings']
472 473 """
473 474
474 475 if isinstance(arg,basestring): return [arg]
475 476 else: return arg
476 477
477 478
478 479 def marquee(txt='',width=78,mark='*'):
479 480 """Return the input string centered in a 'marquee'.
480 481
481 482 :Examples:
482 483
483 484 In [16]: marquee('A test',40)
484 485 Out[16]: '**************** A test ****************'
485 486
486 487 In [17]: marquee('A test',40,'-')
487 488 Out[17]: '---------------- A test ----------------'
488 489
489 490 In [18]: marquee('A test',40,' ')
490 491 Out[18]: ' A test '
491 492
492 493 """
493 494 if not txt:
494 495 return (mark*width)[:width]
495 496 nmark = (width-len(txt)-2)/len(mark)/2
496 497 if nmark < 0: nmark =0
497 498 marks = mark*nmark
498 499 return '%s %s %s' % (marks,txt,marks)
499 500
500 501
501 502 ini_spaces_re = re.compile(r'^(\s+)')
502 503
503 504 def num_ini_spaces(strng):
504 505 """Return the number of initial spaces in a string"""
505 506
506 507 ini_spaces = ini_spaces_re.match(strng)
507 508 if ini_spaces:
508 509 return ini_spaces.end()
509 510 else:
510 511 return 0
511 512
512 513
513 514 def format_screen(strng):
514 515 """Format a string for screen printing.
515 516
516 517 This removes some latex-type format codes."""
517 518 # Paragraph continue
518 519 par_re = re.compile(r'\\$',re.MULTILINE)
519 520 strng = par_re.sub('',strng)
520 521 return strng
521 522
523
524 class EvalFormatter(Formatter):
525 """A String Formatter that allows evaluation of simple expressions.
526
527 Any time a format key is not found in the kwargs,
528 it will be tried as an expression in the kwargs namespace.
529
530 This is to be used in templating cases, such as the parallel batch
531 script templates, where simple arithmetic on arguments is useful.
532
533 Examples
534 --------
535
536 In [1]: f = EvalFormatter()
537 In [2]: f.format('{n/4}', n=8)
538 Out[2]: '2'
539
540 In [3]: f.format('{range(3)}')
541 Out[3]: '[0, 1, 2]'
542
543 In [4]: f.format('{3*2}')
544 Out[4]: '6'
545 """
546
547 def get_value(self, key, args, kwargs):
548 if isinstance(key, (int, long)):
549 return args[key]
550 elif key in kwargs:
551 return kwargs[key]
552 else:
553 # evaluate the expression using kwargs as namespace
554 try:
555 return eval(key, kwargs)
556 except Exception:
557 # classify all bad expressions as key errors
558 raise KeyError(key)
559
560
@@ -1,507 +1,504 b''
1 1 .. _parallel_process:
2 2
3 3 ===========================================
4 4 Starting the IPython controller and engines
5 5 ===========================================
6 6
7 7 To use IPython for parallel computing, you need to start one instance of
8 8 the controller and one or more instances of the engine. The controller
9 9 and each engine can run on different machines or on the same machine.
10 10 Because of this, there are many different possibilities.
11 11
12 12 Broadly speaking, there are two ways of going about starting a controller and engines:
13 13
14 14 * In an automated manner using the :command:`ipcluster` command.
15 15 * In a more manual way using the :command:`ipcontroller` and
16 16 :command:`ipengine` commands.
17 17
18 18 This document describes both of these methods. We recommend that new users
19 19 start with the :command:`ipcluster` command as it simplifies many common usage
20 20 cases.
21 21
22 22 General considerations
23 23 ======================
24 24
25 25 Before delving into the details about how you can start a controller and
26 26 engines using the various methods, we outline some of the general issues that
27 27 come up when starting the controller and engines. These things come up no
28 28 matter which method you use to start your IPython cluster.
29 29
30 30 Let's say that you want to start the controller on ``host0`` and engines on
31 31 hosts ``host1``-``hostn``. The following steps are then required:
32 32
33 33 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
34 34 ``host0``.
35 35 2. Move the JSON file (:file:`ipcontroller-engine.json`) created by the
36 36 controller from ``host0`` to hosts ``host1``-``hostn``.
37 37 3. Start the engines on hosts ``host1``-``hostn`` by running
38 38 :command:`ipengine`. This command has to be told where the JSON file
39 39 (:file:`ipcontroller-engine.json`) is located.
40 40
41 41 At this point, the controller and engines will be connected. By default, the JSON files
42 42 created by the controller are put into the :file:`~/.ipython/cluster_default/security`
43 43 directory. If the engines share a filesystem with the controller, step 2 can be skipped as
44 44 the engines will automatically look at that location.
45 45
46 46 The final step required to actually use the running controller from a client is to move
47 47 the JSON file :file:`ipcontroller-client.json` from ``host0`` to any host where clients
48 48 will be run. If these file are put into the :file:`~/.ipython/cluster_default/security`
49 49 directory of the client's host, they will be found automatically. Otherwise, the full path
50 50 to them has to be passed to the client's constructor.
51 51
52 52 Using :command:`ipcluster`
53 53 ===========================
54 54
55 55 The :command:`ipcluster` command provides a simple way of starting a
56 56 controller and engines in the following situations:
57 57
58 58 1. When the controller and engines are all run on localhost. This is useful
59 59 for testing or running on a multicore computer.
60 60 2. When engines are started using the :command:`mpiexec` command that comes
61 61 with most MPI [MPI]_ implementations
62 62 3. When engines are started using the PBS [PBS]_ batch system
63 63 (or other `qsub` systems, such as SGE).
64 64 4. When the controller is started on localhost and the engines are started on
65 65 remote nodes using :command:`ssh`.
66 66 5. When engines are started using the Windows HPC Server batch system.
67 67
68 68 .. note::
69 69
70 70 Currently :command:`ipcluster` requires that the
71 71 :file:`~/.ipython/cluster_<profile>/security` directory live on a shared filesystem that is
72 72 seen by both the controller and engines. If you don't have a shared file
73 73 system you will need to use :command:`ipcontroller` and
74 74 :command:`ipengine` directly.
75 75
76 76 Under the hood, :command:`ipcluster` just uses :command:`ipcontroller`
77 77 and :command:`ipengine` to perform the steps described above.
78 78
79 79 The simplest way to use ipcluster requires no configuration, and will
80 80 launch a controller and a number of engines on the local machine. For instance,
81 81 to start one controller and 4 engines on localhost, just do::
82 82
83 83 $ ipcluster start n=4
84 84
85 85 To see other command line options, do::
86 86
87 87 $ ipcluster -h
88 88
89 89
90 90 Configuring an IPython cluster
91 91 ==============================
92 92
93 93 Cluster configurations are stored as `profiles`. You can create a new profile with::
94 94
95 95 $ ipcluster create profile=myprofile
96 96
97 97 This will create the directory :file:`IPYTHONDIR/cluster_myprofile`, and populate it
98 98 with the default configuration files for the three IPython cluster commands. Once
99 99 you edit those files, you can continue to call ipcluster/ipcontroller/ipengine
100 100 with no arguments beyond ``p=myprofile``, and any configuration will be maintained.
101 101
102 102 There is no limit to the number of profiles you can have, so you can maintain a profile for each
103 103 of your common use cases. The default profile will be used whenever the
104 104 profile argument is not specified, so edit :file:`IPYTHONDIR/cluster_default/*_config.py` to
105 105 represent your most common use case.
106 106
107 107 The configuration files are loaded with commented-out settings and explanations,
108 108 which should cover most of the available possibilities.
109 109
110 110 Using various batch systems with :command:`ipcluster`
111 111 ------------------------------------------------------
112 112
113 113 :command:`ipcluster` has a notion of Launchers that can start controllers
114 114 and engines with various remote execution schemes. Currently supported
115 115 models include :command:`ssh`, :command`mpiexec`, PBS-style (Torque, SGE),
116 116 and Windows HPC Server.
117 117
118 118 .. note::
119 119
120 120 The Launchers and configuration are designed in such a way that advanced
121 121 users can subclass and configure them to fit their own system that we
122 122 have not yet supported (such as Condor)
123 123
124 124 Using :command:`ipcluster` in mpiexec/mpirun mode
125 125 --------------------------------------------------
126 126
127 127
128 128 The mpiexec/mpirun mode is useful if you:
129 129
130 130 1. Have MPI installed.
131 131 2. Your systems are configured to use the :command:`mpiexec` or
132 132 :command:`mpirun` commands to start MPI processes.
133 133
134 134 If these are satisfied, you can create a new profile::
135 135
136 136 $ ipcluster create profile=mpi
137 137
138 138 and edit the file :file:`IPYTHONDIR/cluster_mpi/ipcluster_config.py`.
139 139
140 140 There, instruct ipcluster to use the MPIExec launchers by adding the lines:
141 141
142 142 .. sourcecode:: python
143 143
144 144 c.IPClusterEnginesApp.engine_launcher = 'IPython.parallel.apps.launcher.MPIExecEngineSetLauncher'
145 145
146 146 If the default MPI configuration is correct, then you can now start your cluster, with::
147 147
148 148 $ ipcluster start n=4 profile=mpi
149 149
150 150 This does the following:
151 151
152 152 1. Starts the IPython controller on current host.
153 153 2. Uses :command:`mpiexec` to start 4 engines.
154 154
155 155 If you have a reason to also start the Controller with mpi, you can specify:
156 156
157 157 .. sourcecode:: python
158 158
159 159 c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.MPIExecControllerLauncher'
160 160
161 161 .. note::
162 162
163 163 The Controller *will not* be in the same MPI universe as the engines, so there is not
164 164 much reason to do this unless sysadmins demand it.
165 165
166 166 On newer MPI implementations (such as OpenMPI), this will work even if you
167 167 don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI
168 168 implementations actually require each process to call :func:`MPI_Init` upon
169 169 starting. The easiest way of having this done is to install the mpi4py
170 170 [mpi4py]_ package and then specify the ``c.MPI.use`` option in :file:`ipengine_config.py`:
171 171
172 172 .. sourcecode:: python
173 173
174 174 c.MPI.use = 'mpi4py'
175 175
176 176 Unfortunately, even this won't work for some MPI implementations. If you are
177 177 having problems with this, you will likely have to use a custom Python
178 178 executable that itself calls :func:`MPI_Init` at the appropriate time.
179 179 Fortunately, mpi4py comes with such a custom Python executable that is easy to
180 180 install and use. However, this custom Python executable approach will not work
181 181 with :command:`ipcluster` currently.
182 182
183 183 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
184 184
185 185
186 186 Using :command:`ipcluster` in PBS mode
187 187 ---------------------------------------
188 188
189 189 The PBS mode uses the Portable Batch System [PBS]_ to start the engines.
190 190
191 191 As usual, we will start by creating a fresh profile::
192 192
193 193 $ ipcluster create profile=pbs
194 194
195 195 And in :file:`ipcluster_config.py`, we will select the PBS launchers for the controller
196 196 and engines:
197 197
198 198 .. sourcecode:: python
199 199
200 200 c.Global.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher'
201 201 c.Global.engine_launcher = 'IPython.parallel.apps.launcher.PBSEngineSetLauncher'
202 202
203 203 IPython does provide simple default batch templates for PBS and SGE, but you may need
204 204 to specify your own. Here is a sample PBS script template:
205 205
206 206 .. sourcecode:: bash
207 207
208 208 #PBS -N ipython
209 209 #PBS -j oe
210 210 #PBS -l walltime=00:10:00
211 #PBS -l nodes=${n/4}:ppn=4
212 #PBS -q $queue
211 #PBS -l nodes={n/4}:ppn=4
212 #PBS -q {queue}
213 213
214 cd $$PBS_O_WORKDIR
215 export PATH=$$HOME/usr/local/bin
216 export PYTHONPATH=$$HOME/usr/local/lib/python2.7/site-packages
217 /usr/local/bin/mpiexec -n ${n} ipengine cluster_dir=${cluster_dir}
214 cd $PBS_O_WORKDIR
215 export PATH=$HOME/usr/local/bin
216 export PYTHONPATH=$HOME/usr/local/lib/python2.7/site-packages
217 /usr/local/bin/mpiexec -n {n} ipengine profile_dir={profile_dir}
218 218
219 219 There are a few important points about this template:
220 220
221 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
222 template engine.
221 1. This template will be rendered at runtime using IPython's :class:`EvalFormatter`.
222 This is simply a subclass of :class:`string.Formatter` that allows simple expressions
223 on keys.
223 224
224 225 2. Instead of putting in the actual number of engines, use the notation
225 ``${n}`` to indicate the number of engines to be started. You can also uses
226 expressions like ``${n/4}`` in the template to indicate the number of
227 nodes. There will always be a ${n} and ${cluster_dir} variable passed to the template.
226 ``{n}`` to indicate the number of engines to be started. You can also use
227 expressions like ``{n/4}`` in the template to indicate the number of nodes.
228 There will always be ``{n}`` and ``{profile_dir}`` variables passed to the formatter.
228 229 These allow the batch system to know how many engines, and where the configuration
229 files reside. The same is true for the batch queue, with the template variable ``$queue``.
230 files reside. The same is true for the batch queue, with the template variable
231 ``{queue}``.
230 232
231 3. Because ``$`` is a special character used by the template engine, you must
232 escape any ``$`` by using ``$$``. This is important when referring to
233 environment variables in the template, or in SGE, where the config lines start
234 with ``#$``, which will have to be ``#$$``.
235
236 4. Any options to :command:`ipengine` can be given in the batch script
233 3. Any options to :command:`ipengine` can be given in the batch script
237 234 template, or in :file:`ipengine_config.py`.
238 235
239 5. Depending on the configuration of you system, you may have to set
236 4. Depending on the configuration of you system, you may have to set
240 237 environment variables in the script template.
241 238
242 239 The controller template should be similar, but simpler:
243 240
244 241 .. sourcecode:: bash
245 242
246 243 #PBS -N ipython
247 244 #PBS -j oe
248 245 #PBS -l walltime=00:10:00
249 246 #PBS -l nodes=1:ppn=4
250 #PBS -q $queue
247 #PBS -q {queue}
251 248
252 cd $$PBS_O_WORKDIR
253 export PATH=$$HOME/usr/local/bin
254 export PYTHONPATH=$$HOME/usr/local/lib/python2.7/site-packages
255 ipcontroller cluster_dir=${cluster_dir}
249 cd $PBS_O_WORKDIR
250 export PATH=$HOME/usr/local/bin
251 export PYTHONPATH=$HOME/usr/local/lib/python2.7/site-packages
252 ipcontroller profile_dir={profile_dir}
256 253
257 254
258 255 Once you have created these scripts, save them with names like
259 256 :file:`pbs.engine.template`. Now you can load them into the :file:`ipcluster_config` with:
260 257
261 258 .. sourcecode:: python
262 259
263 260 c.PBSEngineSetLauncher.batch_template_file = "pbs.engine.template"
264 261
265 262 c.PBSControllerLauncher.batch_template_file = "pbs.controller.template"
266 263
267 264
268 265 Alternately, you can just define the templates as strings inside :file:`ipcluster_config`.
269 266
270 267 Whether you are using your own templates or our defaults, the extra configurables available are
271 the number of engines to launch (``$n``, and the batch system queue to which the jobs are to be
272 submitted (``$queue``)). These are configurables, and can be specified in
268 the number of engines to launch (``{n}``, and the batch system queue to which the jobs are to be
269 submitted (``{queue}``)). These are configurables, and can be specified in
273 270 :file:`ipcluster_config`:
274 271
275 272 .. sourcecode:: python
276 273
277 274 c.PBSLauncher.queue = 'veryshort.q'
278 c.PBSEngineSetLauncher.n = 64
275 c.IPClusterEnginesApp.n = 64
279 276
280 277 Note that assuming you are running PBS on a multi-node cluster, the Controller's default behavior
281 278 of listening only on localhost is likely too restrictive. In this case, also assuming the
282 279 nodes are safely behind a firewall, you can simply instruct the Controller to listen for
283 280 connections on all its interfaces, by adding in :file:`ipcontroller_config`:
284 281
285 282 .. sourcecode:: python
286 283
287 284 c.RegistrationFactory.ip = '*'
288 285
289 286 You can now run the cluster with::
290 287
291 288 $ ipcluster start profile=pbs n=128
292 289
293 290 Additional configuration options can be found in the PBS section of :file:`ipcluster_config`.
294 291
295 292 .. note::
296 293
297 294 Due to the flexibility of configuration, the PBS launchers work with simple changes
298 295 to the template for other :command:`qsub`-using systems, such as Sun Grid Engine,
299 296 and with further configuration in similar batch systems like Condor.
300 297
301 298
302 299 Using :command:`ipcluster` in SSH mode
303 300 ---------------------------------------
304 301
305 302
306 303 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
307 304 nodes and :command:`ipcontroller` can be run remotely as well, or on localhost.
308 305
309 306 .. note::
310 307
311 308 When using this mode it highly recommended that you have set up SSH keys
312 309 and are using ssh-agent [SSH]_ for password-less logins.
313 310
314 311 As usual, we start by creating a clean profile::
315 312
316 $ ipcluster create profile= ssh
313 $ ipcluster create profile=ssh
317 314
318 315 To use this mode, select the SSH launchers in :file:`ipcluster_config.py`:
319 316
320 317 .. sourcecode:: python
321 318
322 319 c.Global.engine_launcher = 'IPython.parallel.apps.launcher.SSHEngineSetLauncher'
323 320 # and if the Controller is also to be remote:
324 321 c.Global.controller_launcher = 'IPython.parallel.apps.launcher.SSHControllerLauncher'
325 322
326 323
327 324 The controller's remote location and configuration can be specified:
328 325
329 326 .. sourcecode:: python
330 327
331 328 # Set the user and hostname for the controller
332 329 # c.SSHControllerLauncher.hostname = 'controller.example.com'
333 330 # c.SSHControllerLauncher.user = os.environ.get('USER','username')
334 331
335 332 # Set the arguments to be passed to ipcontroller
336 333 # note that remotely launched ipcontroller will not get the contents of
337 334 # the local ipcontroller_config.py unless it resides on the *remote host*
338 # in the location specified by the `cluster_dir` argument.
339 # c.SSHControllerLauncher.program_args = ['-r', '-ip', '0.0.0.0', '--cluster_dir', '/path/to/cd']
335 # in the location specified by the `profile_dir` argument.
336 # c.SSHControllerLauncher.program_args = ['--reuse', 'ip=0.0.0.0', 'profile_dir=/path/to/cd']
340 337
341 338 .. note::
342 339
343 340 SSH mode does not do any file movement, so you will need to distribute configuration
344 341 files manually. To aid in this, the `reuse_files` flag defaults to True for ssh-launched
345 342 Controllers, so you will only need to do this once, unless you override this flag back
346 343 to False.
347 344
348 345 Engines are specified in a dictionary, by hostname and the number of engines to be run
349 346 on that host.
350 347
351 348 .. sourcecode:: python
352 349
353 350 c.SSHEngineSetLauncher.engines = { 'host1.example.com' : 2,
354 351 'host2.example.com' : 5,
355 'host3.example.com' : (1, ['cluster_dir=/home/different/location']),
352 'host3.example.com' : (1, ['profile_dir=/home/different/location']),
356 353 'host4.example.com' : 8 }
357 354
358 355 * The `engines` dict, where the keys are the host we want to run engines on and
359 356 the value is the number of engines to run on that host.
360 357 * on host3, the value is a tuple, where the number of engines is first, and the arguments
361 358 to be passed to :command:`ipengine` are the second element.
362 359
363 360 For engines without explicitly specified arguments, the default arguments are set in
364 361 a single location:
365 362
366 363 .. sourcecode:: python
367 364
368 c.SSHEngineSetLauncher.engine_args = ['--cluster_dir', '/path/to/cluster_ssh']
365 c.SSHEngineSetLauncher.engine_args = ['profile_dir=/path/to/cluster_ssh']
369 366
370 367 Current limitations of the SSH mode of :command:`ipcluster` are:
371 368
372 369 * Untested on Windows. Would require a working :command:`ssh` on Windows.
373 370 Also, we are using shell scripts to setup and execute commands on remote
374 371 hosts.
375 372 * No file movement -
376 373
377 374 Using the :command:`ipcontroller` and :command:`ipengine` commands
378 375 ====================================================================
379 376
380 377 It is also possible to use the :command:`ipcontroller` and :command:`ipengine`
381 378 commands to start your controller and engines. This approach gives you full
382 379 control over all aspects of the startup process.
383 380
384 381 Starting the controller and engine on your local machine
385 382 --------------------------------------------------------
386 383
387 384 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
388 385 local machine, do the following.
389 386
390 387 First start the controller::
391 388
392 389 $ ipcontroller
393 390
394 391 Next, start however many instances of the engine you want using (repeatedly)
395 392 the command::
396 393
397 394 $ ipengine
398 395
399 396 The engines should start and automatically connect to the controller using the
400 397 JSON files in :file:`~/.ipython/cluster_default/security`. You are now ready to use the
401 398 controller and engines from IPython.
402 399
403 400 .. warning::
404 401
405 402 The order of the above operations may be important. You *must*
406 403 start the controller before the engines, unless you are reusing connection
407 404 information (via `-r`), in which case ordering is not important.
408 405
409 406 .. note::
410 407
411 408 On some platforms (OS X), to put the controller and engine into the
412 409 background you may need to give these commands in the form ``(ipcontroller
413 410 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
414 411 properly.
415 412
416 413 Starting the controller and engines on different hosts
417 414 ------------------------------------------------------
418 415
419 416 When the controller and engines are running on different hosts, things are
420 417 slightly more complicated, but the underlying ideas are the same:
421 418
422 419 1. Start the controller on a host using :command:`ipcontroller`.
423 420 2. Copy :file:`ipcontroller-engine.json` from :file:`~/.ipython/cluster_<profile>/security` on
424 421 the controller's host to the host where the engines will run.
425 422 3. Use :command:`ipengine` on the engine's hosts to start the engines.
426 423
427 424 The only thing you have to be careful of is to tell :command:`ipengine` where
428 425 the :file:`ipcontroller-engine.json` file is located. There are two ways you
429 426 can do this:
430 427
431 428 * Put :file:`ipcontroller-engine.json` in the :file:`~/.ipython/cluster_<profile>/security`
432 429 directory on the engine's host, where it will be found automatically.
433 430 * Call :command:`ipengine` with the ``--file=full_path_to_the_file``
434 431 flag.
435 432
436 433 The ``--file`` flag works like this::
437 434
438 435 $ ipengine --file=/path/to/my/ipcontroller-engine.json
439 436
440 437 .. note::
441 438
442 439 If the controller's and engine's hosts all have a shared file system
443 440 (:file:`~/.ipython/cluster_<profile>/security` is the same on all of them), then things
444 441 will just work!
445 442
446 443 Make JSON files persistent
447 444 --------------------------
448 445
449 446 At fist glance it may seem that that managing the JSON files is a bit
450 447 annoying. Going back to the house and key analogy, copying the JSON around
451 448 each time you start the controller is like having to make a new key every time
452 449 you want to unlock the door and enter your house. As with your house, you want
453 450 to be able to create the key (or JSON file) once, and then simply use it at
454 451 any point in the future.
455 452
456 453 To do this, the only thing you have to do is specify the `--reuse` flag, so that
457 454 the connection information in the JSON files remains accurate::
458 455
459 456 $ ipcontroller --reuse
460 457
461 458 Then, just copy the JSON files over the first time and you are set. You can
462 459 start and stop the controller and engines any many times as you want in the
463 460 future, just make sure to tell the controller to reuse the file.
464 461
465 462 .. note::
466 463
467 464 You may ask the question: what ports does the controller listen on if you
468 465 don't tell is to use specific ones? The default is to use high random port
469 466 numbers. We do this for two reasons: i) to increase security through
470 467 obscurity and ii) to multiple controllers on a given host to start and
471 468 automatically use different ports.
472 469
473 470 Log files
474 471 ---------
475 472
476 473 All of the components of IPython have log files associated with them.
477 474 These log files can be extremely useful in debugging problems with
478 475 IPython and can be found in the directory :file:`~/.ipython/cluster_<profile>/log`.
479 476 Sending the log files to us will often help us to debug any problems.
480 477
481 478
482 479 Configuring `ipcontroller`
483 480 ---------------------------
484 481
485 482 Ports and addresses
486 483 *******************
487 484
488 485
489 486 Database Backend
490 487 ****************
491 488
492 489
493 490 .. seealso::
494 491
495 492
496 493
497 494 Configuring `ipengine`
498 495 -----------------------
499 496
500 497 .. note::
501 498
502 499 TODO
503 500
504 501
505 502
506 503 .. [PBS] Portable Batch System. http://www.openpbs.org/
507 504 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now