##// END OF EJS Templates
More work on the launchers and Win HPC support.
Brian Granger -
Show More
@@ -8,26 +8,34 b' c = get_config()'
8
8
9 # This allows you to control what method is used to start the controller
9 # This allows you to control what method is used to start the controller
10 # and engines. The following methods are currently supported:
10 # and engines. The following methods are currently supported:
11 # * Start as a regular process on localhost.
11 # - Start as a regular process on localhost.
12 # * Start using mpiexec.
12 # - Start using mpiexec.
13 # * Start using PBS
13 # - Start using the Windows HPC Server 2008 scheduler
14 # * Start using SSH (currently broken)
14 # - Start using PBS
15 # - Start using SSH (currently broken)
16
15
17
16 # The selected launchers can be configured below.
18 # The selected launchers can be configured below.
17
19
18 # Options are (LocalControllerLauncher, MPIExecControllerLauncher,
20 # Options are:
19 # PBSControllerLauncher, WindowsHPCControllerLauncher)
21 # - LocalControllerLauncher
22 # - MPIExecControllerLauncher
23 # - PBSControllerLauncher
24 # - WindowsHPCControllerLauncher
20 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
25 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
21
26
22 # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher,
27 # Options are:
23 # PBSEngineSetLauncher)
28 # - LocalEngineSetLauncher
29 # - MPIExecEngineSetLauncher
30 # - PBSEngineSetLauncher
31 # - WindowsHPCEngineSetLauncher
24 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
32 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
25
33
26 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
27 # Global configuration
35 # Global configuration
28 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
29
37
30 # The default number of engine that will be started. This is overridden by
38 # The default number of engines that will be started. This is overridden by
31 # the -n command line option: "ipcluster start -n 4"
39 # the -n command line option: "ipcluster start -n 4"
32 # c.Global.n = 2
40 # c.Global.n = 2
33
41
@@ -41,18 +49,31 b' c = get_config()'
41 # to change to this directory before starting.
49 # to change to this directory before starting.
42 # c.Global.working_dir = os.getcwd()
50 # c.Global.working_dir = os.getcwd()
43
51
52
44 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
45 # Controller launcher configuration
54 # Local process launchers
46 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
47
56
48 # Configure how the controller is started. The configuration of the controller
57 # The working directory for the controller
49 # can also bet setup by editing the controller config file:
58 # c.LocalControllerLauncher.working_dir = u''
50 # ipcontroller_config.py
51
59
52 # The command line arguments to call the controller with.
60 # The command line arguments to call the controller with.
53 # c.LocalControllerLauncher.controller_args = \
61 # c.LocalControllerLauncher.controller_args = \
54 # ['--log-to-file','--log-level', '40']
62 # ['--log-to-file','--log-level', '40']
55
63
64 # The working directory for the controller
65 # c.LocalEngineSetLauncher.working_dir = u''
66
67 # Command line argument passed to the engines.
68 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
69
70 #-----------------------------------------------------------------------------
71 # MPIExec launchers
72 #-----------------------------------------------------------------------------
73
74 # The working directory for the controller
75 # c.MPIExecControllerLauncher.working_dir = u''
76
56 # The mpiexec/mpirun command to use in started the controller.
77 # The mpiexec/mpirun command to use in started the controller.
57 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
78 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
58
79
@@ -63,6 +84,36 b' c = get_config()'
63 # c.MPIExecControllerLauncher.controller_args = \
84 # c.MPIExecControllerLauncher.controller_args = \
64 # ['--log-to-file','--log-level', '40']
85 # ['--log-to-file','--log-level', '40']
65
86
87
88 # The working directory for the controller
89 # c.MPIExecEngineSetLauncher.working_dir = u''
90
91 # The mpiexec/mpirun command to use in started the controller.
92 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
93
94 # Additional arguments to pass to the actual mpiexec command.
95 # c.MPIExecEngineSetLauncher.mpi_args = []
96
97 # Command line argument passed to the engines.
98 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
99
100 # The default number of engines to start if not given elsewhere.
101 # c.MPIExecEngineSetLauncher.n = 1
102
103 #-----------------------------------------------------------------------------
104 # SSH launchers
105 #-----------------------------------------------------------------------------
106
107 # Todo
108
109
110 #-----------------------------------------------------------------------------
111 # Unix batch (PBS) schedulers launchers
112 #-----------------------------------------------------------------------------
113
114 # The working directory for the controller
115 # c.PBSControllerLauncher.working_dir = u''
116
66 # The command line program to use to submit a PBS job.
117 # The command line program to use to submit a PBS job.
67 # c.PBSControllerLauncher.submit_command = 'qsub'
118 # c.PBSControllerLauncher.submit_command = 'qsub'
68
119
@@ -82,63 +133,9 b' c = get_config()'
82 # submit the job. This will be written to the cluster directory.
133 # submit the job. This will be written to the cluster directory.
83 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
134 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
84
135
85 #-----------------------------------------------------------------------------
86 # Windows HPC Server 2008 launcher configuration
87 #-----------------------------------------------------------------------------
88
89 # c.WinHPCJob.username = 'DOMAIN\\user'
90 # c.WinHPCJob.priority = 'Highest'
91 # c.WinHPCJob.requested_nodes = ''
92 # c.WinHPCJob.project = ''
93 # c.WinHPCJob.is_exclusive = False
94
95 # c.WinHPCTask.environment_variables = {}
96 # c.WinHPCTask.work_directory = ''
97 # c.WinHPCTask.is_rerunnable = True
98
99 # c.IPControllerTask.task_name = 'IPController'
100 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
101 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
102 # c.IPControllerTask.environment_variables = {}
103
104 # c.IPEngineTask.task_name = 'IPController'
105 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
106 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
107 # c.IPEngineTask.environment_variables = {}
108
109 # c.WindowsHPCLauncher.scheduler = 'HEADNODE'
110 # c.WindowsHPCLauncher.username = '\\DOMAIN\USERNAME'
111 # c.WindowsHPCLauncher.priority = 'Highest'
112 # c.WindowsHPCLauncher.requested_nodes = ''
113 # c.WindowsHPCLauncher.job_file_name = u'ipython_job.xml'
114 # c.WindowsHPCLauncher.project = 'MyProject'
115
116 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
117 # c.WindowsHPCControllerLauncher.username = '\\DOMAIN\USERNAME'
118 # c.WindowsHPCControllerLauncher.priority = 'Highest'
119 # c.WindowsHPCControllerLauncher.requested_nodes = ''
120 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
121 # c.WindowsHPCControllerLauncher.project = 'MyProject'
122
123
124 #-----------------------------------------------------------------------------
125 # Engine launcher configuration
126 #-----------------------------------------------------------------------------
127
128 # Command line argument passed to the engines.
129 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
130
131 # The mpiexec/mpirun command to use in started the controller.
132 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
133
134 # Additional arguments to pass to the actual mpiexec command.
135 # c.MPIExecEngineSetLauncher.mpi_args = []
136
137 # Command line argument passed to the engines.
138 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
139
136
140 # The default number of engines to start if not given elsewhere.
137 # The working directory for the controller
141 # c.MPIExecEngineSetLauncher.n = 1
138 # c.PBSEngineSetLauncher.working_dir = u''
142
139
143 # The command line program to use to submit a PBS job.
140 # The command line program to use to submit a PBS job.
144 # c.PBSEngineSetLauncher.submit_command = 'qsub'
141 # c.PBSEngineSetLauncher.submit_command = 'qsub'
@@ -161,37 +158,44 b' c = get_config()'
161 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
158 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
162
159
163 #-----------------------------------------------------------------------------
160 #-----------------------------------------------------------------------------
164 # Base launcher configuration
161 # Windows HPC Server 2008 launcher configuration
165 #-----------------------------------------------------------------------------
162 #-----------------------------------------------------------------------------
166
163
167 # The various launchers are organized into an inheritance hierarchy.
164 # c.IPControllerJob.job_name = 'IPController'
168 # The configurations can also be iherited and the following attributes
165 # c.IPControllerJob.is_exclusive = False
169 # allow you to configure the base classes.
166 # c.IPControllerJob.username = 'USERDOMAIN\\USERNAME'
170
167 # c.IPControllerJob.priority = 'Highest'
171 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
168 # c.IPControllerJob.requested_nodes = ''
172 # c.MPIExecLauncher.mpi_args = []
169 # c.IPControllerJob.project = 'MyProject'
173 # c.MPIExecLauncher.program = []
170
174 # c.MPIExecLauncher.program_args = []
171 # c.IPControllerTask.task_name = 'IPController'
175 # c.MPIExecLauncher.n = 1
172 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
176
173 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
177 # c.SSHLauncher.ssh_cmd = ['ssh']
174 # c.IPControllerTask.environment_variables = {}
178 # c.SSHLauncher.ssh_args = []
175
179 # c.SSHLauncher.program = []
176 # c.WindowsHPCControllerLauncher.working_dir = u''
180 # s.SSHLauncher.program_args = []
177 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
181 # c.SSHLauncher.hostname = ''
178 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
182 # c.SSHLauncher.user = os.environ['USER']
179
183
180
184 # c.BatchSystemLauncher.submit_command
181 # c.IPEngineSetJob.job_name = 'IPEngineSet'
185 # c.BatchSystemLauncher.delete_command
182 # c.IPEngineSetJob.is_exclusive = False
186 # c.BatchSystemLauncher.job_id_regexp
183 # c.IPEngineSetJob.username = 'USERDOMAIN\\USERNAME'
187 # c.BatchSystemLauncher.batch_template
184 # c.IPEngineSetJob.priority = 'Highest'
188 # c.BatchSystemLauncher.batch_file_name
185 # c.IPEngineSetJob.requested_nodes = ''
189
186 # c.IPEngineSetJob.project = 'MyProject'
190 # c.PBSLauncher.submit_command = 'qsub'
187
191 # c.PBSLauncher.delete_command = 'qdel'
188 # c.IPEngineTask.task_name = 'IPEngine'
192 # c.PBSLauncher.job_id_regexp = '\d+'
189 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
193 # c.PBSLauncher.batch_template = """"""
190 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
194 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
191 # c.IPEngineTask.environment_variables = {}
192
193 # c.WindowsHPCEngineSetLauncher.working_dir = u''
194 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
195 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
196
197
198
195
199
196
200
197
201
This diff has been collapsed as it changes many lines, (568 lines changed) Show them Hide them
@@ -1,7 +1,7 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching processing asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
@@ -26,7 +26,8 b' from IPython.utils.platutils import find_cmd'
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 from IPython.kernel.winhpcjob import (
27 from IPython.kernel.winhpcjob import (
28 WinHPCJob, WinHPCTask,
28 WinHPCJob, WinHPCTask,
29 IPControllerTask, IPEngineTask
29 IPControllerTask, IPEngineTask,
30 IPControllerJob, IPEngineSetJob
30 )
31 )
31
32
32 from twisted.internet import reactor, defer
33 from twisted.internet import reactor, defer
@@ -38,7 +39,48 b' from twisted.python import log'
38 from twisted.python.failure import Failure
39 from twisted.python.failure import Failure
39
40
40 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
41 # Generic launchers
42 # Utilities
43 #-----------------------------------------------------------------------------
44
45
46 def find_controller_cmd():
47 """Find the command line ipcontroller program in a cross platform way."""
48 if sys.platform == 'win32':
49 # This logic is needed because the ipcontroller script doesn't
50 # always get installed in the same way or in the same location.
51 from IPython.kernel import ipcontrollerapp
52 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
53 # The -u option here turns on unbuffered output, which is required
54 # on Win32 to prevent wierd conflict and problems with Twisted.
55 # Also, use sys.executable to make sure we are picking up the
56 # right python exe.
57 cmd = [sys.executable, '-u', script_location]
58 else:
59 # ipcontroller has to be on the PATH in this case.
60 cmd = ['ipcontroller']
61 return cmd
62
63
64 def find_engine_cmd():
65 """Find the command line ipengine program in a cross platform way."""
66 if sys.platform == 'win32':
67 # This logic is needed because the ipengine script doesn't
68 # always get installed in the same way or in the same location.
69 from IPython.kernel import ipengineapp
70 script_location = ipengineapp.__file__.replace('.pyc', '.py')
71 # The -u option here turns on unbuffered output, which is required
72 # on Win32 to prevent wierd conflict and problems with Twisted.
73 # Also, use sys.executable to make sure we are picking up the
74 # right python exe.
75 cmd = [sys.executable, '-u', script_location]
76 else:
77 # ipcontroller has to be on the PATH in this case.
78 cmd = ['ipengine']
79 return cmd
80
81
82 #-----------------------------------------------------------------------------
83 # Base launchers and errors
42 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
43
85
44
86
@@ -57,8 +99,6 b' class UnknownStatus(LauncherError):'
57 class BaseLauncher(Component):
99 class BaseLauncher(Component):
58 """An asbtraction for starting, stopping and signaling a process."""
100 """An asbtraction for starting, stopping and signaling a process."""
59
101
60 # A directory for files related to the process. But, we don't cd to
61 # this directory,
62 working_dir = Unicode(u'')
102 working_dir = Unicode(u'')
63
103
64 def __init__(self, working_dir, parent=None, name=None, config=None):
104 def __init__(self, working_dir, parent=None, name=None, config=None):
@@ -181,6 +221,11 b' class BaseLauncher(Component):'
181 )
221 )
182
222
183
223
224 #-----------------------------------------------------------------------------
225 # Local process launchers
226 #-----------------------------------------------------------------------------
227
228
184 class LocalProcessLauncherProtocol(ProcessProtocol):
229 class LocalProcessLauncherProtocol(ProcessProtocol):
185 """A ProcessProtocol to go with the LocalProcessLauncher."""
230 """A ProcessProtocol to go with the LocalProcessLauncher."""
186
231
@@ -278,6 +323,112 b' class LocalProcessLauncher(BaseLauncher):'
278 yield self.signal('KILL')
323 yield self.signal('KILL')
279
324
280
325
326 class LocalControllerLauncher(LocalProcessLauncher):
327 """Launch a controller as a regular external process."""
328
329 controller_cmd = List(find_controller_cmd(), config=True)
330 # Command line arguments to ipcontroller.
331 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
332
333 def find_args(self):
334 return self.controller_cmd + self.controller_args + \
335 ['--working-dir', self.working_dir]
336
337 def start(self, cluster_dir):
338 """Start the controller by cluster_dir."""
339 self.controller_args.extend(['--cluster-dir', cluster_dir])
340 self.cluster_dir = unicode(cluster_dir)
341 log.msg("Starting LocalControllerLauncher: %r" % self.args)
342 return super(LocalControllerLauncher, self).start()
343
344
345 class LocalEngineLauncher(LocalProcessLauncher):
346 """Launch a single engine as a regular externall process."""
347
348 engine_cmd = List(find_engine_cmd(), config=True)
349 # Command line arguments for ipengine.
350 engine_args = List(
351 ['--log-to-file','--log-level', '40'], config=True
352 )
353
354 def find_args(self):
355 return self.engine_cmd + self.engine_args + \
356 ['--working-dir', self.working_dir]
357
358 def start(self, cluster_dir):
359 """Start the engine by cluster_dir."""
360 self.engine_args.extend(['--cluster-dir', cluster_dir])
361 self.cluster_dir = unicode(cluster_dir)
362 return super(LocalEngineLauncher, self).start()
363
364
365 class LocalEngineSetLauncher(BaseLauncher):
366 """Launch a set of engines as regular external processes."""
367
368 # Command line arguments for ipengine.
369 engine_args = List(
370 ['--log-to-file','--log-level', '40'], config=True
371 )
372
373 def __init__(self, working_dir, parent=None, name=None, config=None):
374 super(LocalEngineSetLauncher, self).__init__(
375 working_dir, parent, name, config
376 )
377 self.launchers = []
378
379 def start(self, n, cluster_dir):
380 """Start n engines by profile or cluster_dir."""
381 self.cluster_dir = unicode(cluster_dir)
382 dlist = []
383 for i in range(n):
384 el = LocalEngineLauncher(self.working_dir, self)
385 # Copy the engine args over to each engine launcher.
386 import copy
387 el.engine_args = copy.deepcopy(self.engine_args)
388 d = el.start(cluster_dir)
389 if i==0:
390 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
391 self.launchers.append(el)
392 dlist.append(d)
393 # The consumeErrors here could be dangerous
394 dfinal = gatherBoth(dlist, consumeErrors=True)
395 dfinal.addCallback(self.notify_start)
396 return dfinal
397
398 def find_args(self):
399 return ['engine set']
400
401 def signal(self, sig):
402 dlist = []
403 for el in self.launchers:
404 d = el.signal(sig)
405 dlist.append(d)
406 dfinal = gatherBoth(dlist, consumeErrors=True)
407 return dfinal
408
409 def interrupt_then_kill(self, delay=1.0):
410 dlist = []
411 for el in self.launchers:
412 d = el.interrupt_then_kill(delay)
413 dlist.append(d)
414 dfinal = gatherBoth(dlist, consumeErrors=True)
415 return dfinal
416
417 def stop(self):
418 return self.interrupt_then_kill()
419
420 def observe_stop(self):
421 dlist = [el.observe_stop() for el in self.launchers]
422 dfinal = gatherBoth(dlist, consumeErrors=False)
423 dfinal.addCallback(self.notify_stop)
424 return dfinal
425
426
427 #-----------------------------------------------------------------------------
428 # MPIExec launchers
429 #-----------------------------------------------------------------------------
430
431
281 class MPIExecLauncher(LocalProcessLauncher):
432 class MPIExecLauncher(LocalProcessLauncher):
282 """Launch an external process using mpiexec."""
433 """Launch an external process using mpiexec."""
283
434
@@ -303,6 +454,54 b' class MPIExecLauncher(LocalProcessLauncher):'
303 return super(MPIExecLauncher, self).start()
454 return super(MPIExecLauncher, self).start()
304
455
305
456
457 class MPIExecControllerLauncher(MPIExecLauncher):
458 """Launch a controller using mpiexec."""
459
460 controller_cmd = List(find_controller_cmd(), config=True)
461 # Command line arguments to ipcontroller.
462 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
463 n = Int(1, config=False)
464
465 def start(self, cluster_dir):
466 """Start the controller by cluster_dir."""
467 self.controller_args.extend(['--cluster-dir', cluster_dir])
468 self.cluster_dir = unicode(cluster_dir)
469 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
470 return super(MPIExecControllerLauncher, self).start(1)
471
472 def find_args(self):
473 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
474 self.controller_cmd + self.controller_args + \
475 ['--working-dir', self.working_dir]
476
477
478 class MPIExecEngineSetLauncher(MPIExecLauncher):
479
480 engine_cmd = List(find_engine_cmd(), config=True)
481 # Command line arguments for ipengine.
482 engine_args = List(
483 ['--log-to-file','--log-level', '40'], config=True
484 )
485 n = Int(1, config=True)
486
487 def start(self, n, cluster_dir):
488 """Start n engines by profile or cluster_dir."""
489 self.engine_args.extend(['--cluster-dir', cluster_dir])
490 self.cluster_dir = unicode(cluster_dir)
491 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
492 return super(MPIExecEngineSetLauncher, self).start(n)
493
494 def find_args(self):
495 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
496 self.engine_cmd + self.engine_args + \
497 ['--working-dir', self.working_dir]
498
499
500 #-----------------------------------------------------------------------------
501 # SSH launchers
502 #-----------------------------------------------------------------------------
503
504
306 class SSHLauncher(BaseLauncher):
505 class SSHLauncher(BaseLauncher):
307 """A minimal launcher for ssh.
506 """A minimal launcher for ssh.
308
507
@@ -337,11 +536,25 b' class SSHLauncher(BaseLauncher):'
337 return super(SSHLauncher, self).start()
536 return super(SSHLauncher, self).start()
338
537
339
538
539 class SSHControllerLauncher(SSHLauncher):
540 pass
541
542
543 class SSHEngineSetLauncher(BaseLauncher):
544 pass
545
546
547 #-----------------------------------------------------------------------------
548 # Windows HPC Server 2008 scheduler launchers
549 #-----------------------------------------------------------------------------
550
551
340 # This is only used on Windows.
552 # This is only used on Windows.
341 if os.name=='nt':
553 def find_job_cmd():
342 job_cmd = find_cmd('job')
554 if os.name=='nt':
343 else:
555 return find_cmd('job')
344 job_cmd = 'job'
556 else:
557 return 'job'
345
558
346
559
347 class WindowsHPCLauncher(BaseLauncher):
560 class WindowsHPCLauncher(BaseLauncher):
@@ -355,13 +568,8 b' class WindowsHPCLauncher(BaseLauncher):'
355 # by combining the working_dir with the job_file_name.
568 # by combining the working_dir with the job_file_name.
356 job_file = Unicode(u'')
569 job_file = Unicode(u'')
357 # The hostname of the scheduler to submit the job to
570 # The hostname of the scheduler to submit the job to
358 scheduler = Str('HEADNODE', config=True)
571 scheduler = Str('', config=True)
359 username = Str(os.environ.get('USERNAME', ''), config=True)
572 job_cmd = Str(find_job_cmd, config=True)
360 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
361 default_value='Highest', config=True)
362 requested_nodes = Str('', config=True)
363 project = Str('MyProject', config=True)
364 job_cmd = Str(job_cmd, config=True)
365
573
366 def __init__(self, working_dir, parent=None, name=None, config=None):
574 def __init__(self, working_dir, parent=None, name=None, config=None):
367 super(WindowsHPCLauncher, self).__init__(
575 super(WindowsHPCLauncher, self).__init__(
@@ -369,6 +577,10 b' class WindowsHPCLauncher(BaseLauncher):'
369 )
577 )
370 self.job_file = os.path.join(self.working_dir, self.job_file_name)
578 self.job_file = os.path.join(self.working_dir, self.job_file_name)
371
579
580 @property
581 def job_file(self):
582 return os.path.join(self.working_dir, self.job_file_name)
583
372 def write_job_file(self, n):
584 def write_job_file(self, n):
373 raise NotImplementedError("Implement write_job_file in a subclass.")
585 raise NotImplementedError("Implement write_job_file in a subclass.")
374
586
@@ -423,7 +635,79 b' class WindowsHPCLauncher(BaseLauncher):'
423 output = 'The job already appears to be stoppped: %r' % self.job_id
635 output = 'The job already appears to be stoppped: %r' % self.job_id
424 self.notify_stop(output) # Pass the output of the kill cmd
636 self.notify_stop(output) # Pass the output of the kill cmd
425 defer.returnValue(output)
637 defer.returnValue(output)
426
638
639
640 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
641
642 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
643 extra_args = List([], config=False)
644
645 def write_job_file(self, n):
646 job = IPControllerJob(self)
647
648 t = IPControllerTask(self)
649 # The tasks work directory is *not* the actual work directory of
650 # the controller. It is used as the base path for the stdout/stderr
651 # files that the scheduler redirects to.
652 t.work_directory = self.cluster_dir
653 # Add the --cluster-dir and --working-dir from self.start().
654 t.controller_args.extend(self.extra_args)
655 job.add_task(t)
656
657 log.msg("Writing job description file: %s" % self.job_file)
658 job.write(self.job_file)
659
660 @property
661 def job_file(self):
662 return os.path.join(self.cluster_dir, self.job_file_name)
663
664 def start(self, cluster_dir):
665 """Start the controller by cluster_dir."""
666 self.extra_args = [
667 '--cluster-dir', cluster_dir, '--working-dir', self.working_dir
668 ]
669 self.cluster_dir = unicode(cluster_dir)
670 return super(WindowsHPCControllerLauncher, self).start(1)
671
672
673 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
674
675 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
676 extra_args = List([], config=False)
677
678 def write_job_file(self, n):
679 job = IPControllerJob(self)
680
681 for i in range(n):
682 t = IPEngineTask(self)
683 # The tasks work directory is *not* the actual work directory of
684 # the engine. It is used as the base path for the stdout/stderr
685 # files that the scheduler redirects to.
686 t.work_directory = self.cluster_dir
687 # Add the --cluster-dir and --working-dir from self.start().
688 t.engine_args.extend(self.extra_args)
689 job.add_task(t)
690
691 log.msg("Writing job description file: %s" % self.job_file)
692 job.write(self.job_file)
693
694 @property
695 def job_file(self):
696 return os.path.join(self.cluster_dir, self.job_file_name)
697
698 def start(self, cluster_dir):
699 """Start the controller by cluster_dir."""
700 self.extra_args = [
701 '--cluster-dir', cluster_dir, '--working-dir', self.working_dir
702 ]
703 self.cluster_dir = unicode(cluster_dir)
704 return super(WindowsHPCControllerLauncher, self).start(n)
705
706
707 #-----------------------------------------------------------------------------
708 # Batch (PBS) system launchers
709 #-----------------------------------------------------------------------------
710
427
711
428 class BatchSystemLauncher(BaseLauncher):
712 class BatchSystemLauncher(BaseLauncher):
429 """Launch an external process using a batch system.
713 """Launch an external process using a batch system.
@@ -511,272 +795,34 b' class PBSLauncher(BatchSystemLauncher):'
511 batch_file = Unicode(u'')
795 batch_file = Unicode(u'')
512
796
513
797
514 #-----------------------------------------------------------------------------
515 # Controller launchers
516 #-----------------------------------------------------------------------------
517
518 def find_controller_cmd():
519 """Find the command line ipcontroller program in a cross platform way."""
520 if sys.platform == 'win32':
521 # This logic is needed because the ipcontroller script doesn't
522 # always get installed in the same way or in the same location.
523 from IPython.kernel import ipcontrollerapp
524 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
525 # The -u option here turns on unbuffered output, which is required
526 # on Win32 to prevent wierd conflict and problems with Twisted.
527 # Also, use sys.executable to make sure we are picking up the
528 # right python exe.
529 cmd = [sys.executable, '-u', script_location]
530 else:
531 # ipcontroller has to be on the PATH in this case.
532 cmd = ['ipcontroller']
533 return cmd
534
535
536 class LocalControllerLauncher(LocalProcessLauncher):
537 """Launch a controller as a regular external process."""
538
539 controller_cmd = List(find_controller_cmd(), config=True)
540 # Command line arguments to ipcontroller.
541 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
542
543 def find_args(self):
544 return self.controller_cmd + self.controller_args
545
546 def start(self, profile=None, cluster_dir=None):
547 """Start the controller by profile or cluster_dir."""
548 if cluster_dir is not None:
549 self.controller_args.extend(['--cluster-dir', cluster_dir])
550 if profile is not None:
551 self.controller_args.extend(['--profile', profile])
552 log.msg("Starting LocalControllerLauncher: %r" % self.args)
553 return super(LocalControllerLauncher, self).start()
554
555
556 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
557
558 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
559 extra_args = List([],config=False)
560
561 def write_job_file(self, n):
562 job = WinHPCJob(self)
563 job.job_name = "IPController"
564 job.username = self.username
565 job.priority = self.priority
566 job.requested_nodes = self.requested_nodes
567 job.project = self.project
568
569 t = IPControllerTask(self)
570 t.work_directory = self.working_dir
571 # Add the --profile and --cluster-dir args from start.
572 t.controller_args.extend(self.extra_args)
573 job.add_task(t)
574 log.msg("Writing job description file: %s" % self.job_file)
575 job.write(self.job_file)
576
577 def start(self, profile=None, cluster_dir=None):
578 """Start the controller by profile or cluster_dir."""
579 if cluster_dir is not None:
580 self.extra_args = ['--cluster-dir', cluster_dir]
581 if profile is not None:
582 self.extra_args = ['--profile', profile]
583 return super(WindowsHPCControllerLauncher, self).start(1)
584
585
586 class MPIExecControllerLauncher(MPIExecLauncher):
587 """Launch a controller using mpiexec."""
588
589 controller_cmd = List(find_controller_cmd(), config=True)
590 # Command line arguments to ipcontroller.
591 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
592 n = Int(1, config=False)
593
594 def start(self, profile=None, cluster_dir=None):
595 """Start the controller by profile or cluster_dir."""
596 if cluster_dir is not None:
597 self.controller_args.extend(['--cluster-dir', cluster_dir])
598 if profile is not None:
599 self.controller_args.extend(['--profile', profile])
600 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
601 return super(MPIExecControllerLauncher, self).start(1)
602
603 def find_args(self):
604 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
605 self.controller_cmd + self.controller_args
606
607
608 class PBSControllerLauncher(PBSLauncher):
798 class PBSControllerLauncher(PBSLauncher):
609 """Launch a controller using PBS."""
799 """Launch a controller using PBS."""
610
800
611 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
801 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
612
802
613 def start(self, profile=None, cluster_dir=None):
803 def start(self, cluster_dir):
614 """Start the controller by profile or cluster_dir."""
804 """Start the controller by profile or cluster_dir."""
615 # Here we save profile and cluster_dir in the context so they
805 # Here we save profile and cluster_dir in the context so they
616 # can be used in the batch script template as ${profile} and
806 # can be used in the batch script template as ${profile} and
617 # ${cluster_dir}
807 # ${cluster_dir}
618 if cluster_dir is not None:
808 self.context['cluster_dir'] = cluster_dir
619 self.context['cluster_dir'] = cluster_dir
809 self.cluster_dir = unicode(cluster_dir)
620 if profile is not None:
621 self.context['profile'] = profile
622 log.msg("Starting PBSControllerLauncher: %r" % self.args)
810 log.msg("Starting PBSControllerLauncher: %r" % self.args)
623 return super(PBSControllerLauncher, self).start(1)
811 return super(PBSControllerLauncher, self).start(1)
624
812
625
813
626 class SSHControllerLauncher(SSHLauncher):
627 pass
628
629
630 #-----------------------------------------------------------------------------
631 # Engine launchers
632 #-----------------------------------------------------------------------------
633
634
635 def find_engine_cmd():
636 """Find the command line ipengine program in a cross platform way."""
637 if sys.platform == 'win32':
638 # This logic is needed because the ipengine script doesn't
639 # always get installed in the same way or in the same location.
640 from IPython.kernel import ipengineapp
641 script_location = ipengineapp.__file__.replace('.pyc', '.py')
642 # The -u option here turns on unbuffered output, which is required
643 # on Win32 to prevent wierd conflict and problems with Twisted.
644 # Also, use sys.executable to make sure we are picking up the
645 # right python exe.
646 cmd = [sys.executable, '-u', script_location]
647 else:
648 # ipcontroller has to be on the PATH in this case.
649 cmd = ['ipengine']
650 return cmd
651
652
653 class LocalEngineLauncher(LocalProcessLauncher):
654 """Launch a single engine as a regular externall process."""
655
656 engine_cmd = List(find_engine_cmd(), config=True)
657 # Command line arguments for ipengine.
658 engine_args = List(
659 ['--log-to-file','--log-level', '40'], config=True
660 )
661
662 def find_args(self):
663 return self.engine_cmd + self.engine_args
664
665 def start(self, profile=None, cluster_dir=None):
666 """Start the engine by profile or cluster_dir."""
667 if cluster_dir is not None:
668 self.engine_args.extend(['--cluster-dir', cluster_dir])
669 if profile is not None:
670 self.engine_args.extend(['--profile', profile])
671 return super(LocalEngineLauncher, self).start()
672
673
674 class LocalEngineSetLauncher(BaseLauncher):
675 """Launch a set of engines as regular external processes."""
676
677 # Command line arguments for ipengine.
678 engine_args = List(
679 ['--log-to-file','--log-level', '40'], config=True
680 )
681
682 def __init__(self, working_dir, parent=None, name=None, config=None):
683 super(LocalEngineSetLauncher, self).__init__(
684 working_dir, parent, name, config
685 )
686 self.launchers = []
687
688 def start(self, n, profile=None, cluster_dir=None):
689 """Start n engines by profile or cluster_dir."""
690 dlist = []
691 for i in range(n):
692 el = LocalEngineLauncher(self.working_dir, self)
693 # Copy the engine args over to each engine launcher.
694 import copy
695 el.engine_args = copy.deepcopy(self.engine_args)
696 d = el.start(profile, cluster_dir)
697 if i==0:
698 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
699 self.launchers.append(el)
700 dlist.append(d)
701 # The consumeErrors here could be dangerous
702 dfinal = gatherBoth(dlist, consumeErrors=True)
703 dfinal.addCallback(self.notify_start)
704 return dfinal
705
706 def find_args(self):
707 return ['engine set']
708
709 def signal(self, sig):
710 dlist = []
711 for el in self.launchers:
712 d = el.signal(sig)
713 dlist.append(d)
714 dfinal = gatherBoth(dlist, consumeErrors=True)
715 return dfinal
716
717 def interrupt_then_kill(self, delay=1.0):
718 dlist = []
719 for el in self.launchers:
720 d = el.interrupt_then_kill(delay)
721 dlist.append(d)
722 dfinal = gatherBoth(dlist, consumeErrors=True)
723 return dfinal
724
725 def stop(self):
726 return self.interrupt_then_kill()
727
728 def observe_stop(self):
729 dlist = [el.observe_stop() for el in self.launchers]
730 dfinal = gatherBoth(dlist, consumeErrors=False)
731 dfinal.addCallback(self.notify_stop)
732 return dfinal
733
734
735 class MPIExecEngineSetLauncher(MPIExecLauncher):
736
737 engine_cmd = List(find_engine_cmd(), config=True)
738 # Command line arguments for ipengine.
739 engine_args = List(
740 ['--log-to-file','--log-level', '40'], config=True
741 )
742 n = Int(1, config=True)
743
744 def start(self, n, profile=None, cluster_dir=None):
745 """Start n engines by profile or cluster_dir."""
746 if cluster_dir is not None:
747 self.engine_args.extend(['--cluster-dir', cluster_dir])
748 if profile is not None:
749 self.engine_args.extend(['--profile', profile])
750 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
751 return super(MPIExecEngineSetLauncher, self).start(n)
752
753 def find_args(self):
754 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
755 self.engine_cmd + self.engine_args
756
757
758 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
759 pass
760
761
762 class PBSEngineSetLauncher(PBSLauncher):
814 class PBSEngineSetLauncher(PBSLauncher):
763
815
764 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
816 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
765
817
766 def start(self, n, profile=None, cluster_dir=None):
818 def start(self, n, cluster_dir):
767 """Start n engines by profile or cluster_dir."""
819 """Start n engines by profile or cluster_dir."""
768 if cluster_dir is not None:
820 self.program_args.extend(['--cluster-dir', cluster_dir])
769 self.program_args.extend(['--cluster-dir', cluster_dir])
821 self.cluster_dir = unicode(cluster_dir)
770 if profile is not None:
771 self.program_args.extend(['-p', profile])
772 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
822 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
773 return super(PBSEngineSetLauncher, self).start(n)
823 return super(PBSEngineSetLauncher, self).start(n)
774
824
775
825
776 class SSHEngineSetLauncher(BaseLauncher):
777 pass
778
779
780 #-----------------------------------------------------------------------------
826 #-----------------------------------------------------------------------------
781 # A launcher for ipcluster itself!
827 # A launcher for ipcluster itself!
782 #-----------------------------------------------------------------------------
828 #-----------------------------------------------------------------------------
@@ -67,6 +67,15 b' def indent(elem, level=0):'
67 elem.tail = i
67 elem.tail = i
68
68
69
69
70 def find_username():
71 domain = os.environ.get('USERDOMAIN')
72 username = os.environ.get('USERNAME','')
73 if domain is None:
74 return username
75 else:
76 return '%s\\%s' % (domain, username)
77
78
70 class WinHPCJob(Component):
79 class WinHPCJob(Component):
71
80
72 job_id = Str('')
81 job_id = Str('')
@@ -82,7 +91,7 b' class WinHPCJob(Component):'
82 auto_calculate_max = Bool(True, config=True)
91 auto_calculate_max = Bool(True, config=True)
83 run_until_canceled = Bool(False, config=True)
92 run_until_canceled = Bool(False, config=True)
84 is_exclusive = Bool(False, config=True)
93 is_exclusive = Bool(False, config=True)
85 username = Str(os.environ.get('USERNAME', ''), config=True)
94 username = Str(find_username(), config=True)
86 job_type = Str('Batch', config=True)
95 job_type = Str('Batch', config=True)
87 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
96 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
88 default_value='Highest', config=True)
97 default_value='Highest', config=True)
@@ -216,7 +225,27 b' class WinHPCTask(Component):'
216
225
217
226
218 # By declaring these, we can configure the controller and engine separately!
227 # By declaring these, we can configure the controller and engine separately!
219
228
229 class IPControllerJob(WinHPCJob):
230 job_name = Str('IPController', config=False)
231 is_exclusive = Bool(False, config=True)
232 username = Str(find_username(), config=True)
233 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
234 default_value='Highest', config=True)
235 requested_nodes = Str('', config=True)
236 project = Str('IPython', config=True)
237
238
239 class IPEngineSetJob(WinHPCJob):
240 job_name = Str('IPEngineSet', config=False)
241 is_exclusive = Bool(False, config=True)
242 username = Str(find_username(), config=True)
243 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
244 default_value='Highest', config=True)
245 requested_nodes = Str('', config=True)
246 project = Str('IPython', config=True)
247
248
220 class IPControllerTask(WinHPCTask):
249 class IPControllerTask(WinHPCTask):
221
250
222 task_name = Str('IPController', config=True)
251 task_name = Str('IPController', config=True)
General Comments 0
You need to be logged in to leave comments. Login now