##// END OF EJS Templates
Fixing minor bugs in IPython.kernel....
Brian Granger -
Show More
@@ -1,184 +1,184 b''
1 import os
1 import os
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Select which launchers to use
6 # Select which launchers to use
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # This allows you to control what method is used to start the controller
9 # This allows you to control what method is used to start the controller
10 # and engines. The following methods are currently supported:
10 # and engines. The following methods are currently supported:
11 # - Start as a regular process on localhost.
11 # - Start as a regular process on localhost.
12 # - Start using mpiexec.
12 # - Start using mpiexec.
13 # - Start using the Windows HPC Server 2008 scheduler
13 # - Start using the Windows HPC Server 2008 scheduler
14 # - Start using PBS
14 # - Start using PBS
15 # - Start using SSH (currently broken)
15 # - Start using SSH (currently broken)
16
16
17
17
18 # The selected launchers can be configured below.
18 # The selected launchers can be configured below.
19
19
20 # Options are:
20 # Options are:
21 # - LocalControllerLauncher
21 # - LocalControllerLauncher
22 # - MPIExecControllerLauncher
22 # - MPIExecControllerLauncher
23 # - PBSControllerLauncher
23 # - PBSControllerLauncher
24 # - WindowsHPCControllerLauncher
24 # - WindowsHPCControllerLauncher
25 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
25 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
26
26
27 # Options are:
27 # Options are:
28 # - LocalEngineSetLauncher
28 # - LocalEngineSetLauncher
29 # - MPIExecEngineSetLauncher
29 # - MPIExecEngineSetLauncher
30 # - PBSEngineSetLauncher
30 # - PBSEngineSetLauncher
31 # - WindowsHPCEngineSetLauncher
31 # - WindowsHPCEngineSetLauncher
32 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
32 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Global configuration
35 # Global configuration
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38 # The default number of engines that will be started. This is overridden by
38 # The default number of engines that will be started. This is overridden by
39 # the -n command line option: "ipcluster start -n 4"
39 # the -n command line option: "ipcluster start -n 4"
40 # c.Global.n = 2
40 # c.Global.n = 2
41
41
42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
43 # c.Global.log_to_file = False
43 # c.Global.log_to_file = False
44
44
45 # Remove old logs from cluster_dir/log before starting.
45 # Remove old logs from cluster_dir/log before starting.
46 # c.Global.clean_logs = True
46 # c.Global.clean_logs = True
47
47
48 # The working directory for the process. The application will use os.chdir
48 # The working directory for the process. The application will use os.chdir
49 # to change to this directory before starting.
49 # to change to this directory before starting.
50 # c.Global.work_dir = os.getcwd()
50 # c.Global.work_dir = os.getcwd()
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Local process launchers
54 # Local process launchers
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 # The command line arguments to call the controller with.
57 # The command line arguments to call the controller with.
58 # c.LocalControllerLauncher.controller_args = \
58 # c.LocalControllerLauncher.controller_args = \
59 # ['--log-to-file','--log-level', '40']
59 # ['--log-to-file','--log-level', '40']
60
60
61 # The working directory for the controller
61 # The working directory for the controller
62 # c.LocalEngineSetLauncher.work_dir = u''
62 # c.LocalEngineSetLauncher.work_dir = u''
63
63
64 # Command line argument passed to the engines.
64 # Command line argument passed to the engines.
65 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
65 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # MPIExec launchers
68 # MPIExec launchers
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70
70
71 # The mpiexec/mpirun command to use in started the controller.
71 # The mpiexec/mpirun command to use in started the controller.
72 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
72 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
73
73
74 # Additional arguments to pass to the actual mpiexec command.
74 # Additional arguments to pass to the actual mpiexec command.
75 # c.MPIExecControllerLauncher.mpi_args = []
75 # c.MPIExecControllerLauncher.mpi_args = []
76
76
77 # The command line argument to call the controller with.
77 # The command line argument to call the controller with.
78 # c.MPIExecControllerLauncher.controller_args = \
78 # c.MPIExecControllerLauncher.controller_args = \
79 # ['--log-to-file','--log-level', '40']
79 # ['--log-to-file','--log-level', '40']
80
80
81
81
82 # The mpiexec/mpirun command to use in started the controller.
82 # The mpiexec/mpirun command to use in started the controller.
83 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
83 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
84
84
85 # Additional arguments to pass to the actual mpiexec command.
85 # Additional arguments to pass to the actual mpiexec command.
86 # c.MPIExecEngineSetLauncher.mpi_args = []
86 # c.MPIExecEngineSetLauncher.mpi_args = []
87
87
88 # Command line argument passed to the engines.
88 # Command line argument passed to the engines.
89 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
89 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
90
90
91 # The default number of engines to start if not given elsewhere.
91 # The default number of engines to start if not given elsewhere.
92 # c.MPIExecEngineSetLauncher.n = 1
92 # c.MPIExecEngineSetLauncher.n = 1
93
93
94 #-----------------------------------------------------------------------------
94 #-----------------------------------------------------------------------------
95 # SSH launchers
95 # SSH launchers
96 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
97
97
98 # Todo
98 # Todo
99
99
100
100
101 #-----------------------------------------------------------------------------
101 #-----------------------------------------------------------------------------
102 # Unix batch (PBS) schedulers launchers
102 # Unix batch (PBS) schedulers launchers
103 #-----------------------------------------------------------------------------
103 #-----------------------------------------------------------------------------
104
104
105 # The command line program to use to submit a PBS job.
105 # The command line program to use to submit a PBS job.
106 # c.PBSControllerLauncher.submit_command = 'qsub'
106 # c.PBSControllerLauncher.submit_command = 'qsub'
107
107
108 # The command line program to use to delete a PBS job.
108 # The command line program to use to delete a PBS job.
109 # c.PBSControllerLauncher.delete_command = 'qdel'
109 # c.PBSControllerLauncher.delete_command = 'qdel'
110
110
111 # A regular expression that takes the output of qsub and find the job id.
111 # A regular expression that takes the output of qsub and find the job id.
112 # c.PBSControllerLauncher.job_id_regexp = '\d+'
112 # c.PBSControllerLauncher.job_id_regexp = r'\d+'
113
113
114 # The batch submission script used to start the controller. This is where
114 # The batch submission script used to start the controller. This is where
115 # environment variables would be setup, etc. This string is interpolated using
115 # environment variables would be setup, etc. This string is interpolated using
116 # the Itpl module in IPython.external. Basically, you can use ${n} for the
116 # the Itpl module in IPython.external. Basically, you can use ${n} for the
117 # number of engine and ${cluster_dir} for the cluster_dir.
117 # number of engine and ${cluster_dir} for the cluster_dir.
118 # c.PBSControllerLauncher.batch_template = """"""
118 # c.PBSControllerLauncher.batch_template = """"""
119
119
120 # The name of the instantiated batch script that will actually be used to
120 # The name of the instantiated batch script that will actually be used to
121 # submit the job. This will be written to the cluster directory.
121 # submit the job. This will be written to the cluster directory.
122 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
122 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
123
123
124
124
125 # The command line program to use to submit a PBS job.
125 # The command line program to use to submit a PBS job.
126 # c.PBSEngineSetLauncher.submit_command = 'qsub'
126 # c.PBSEngineSetLauncher.submit_command = 'qsub'
127
127
128 # The command line program to use to delete a PBS job.
128 # The command line program to use to delete a PBS job.
129 # c.PBSEngineSetLauncher.delete_command = 'qdel'
129 # c.PBSEngineSetLauncher.delete_command = 'qdel'
130
130
131 # A regular expression that takes the output of qsub and find the job id.
131 # A regular expression that takes the output of qsub and find the job id.
132 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
132 # c.PBSEngineSetLauncher.job_id_regexp = r'\d+'
133
133
134 # The batch submission script used to start the engines. This is where
134 # The batch submission script used to start the engines. This is where
135 # environment variables would be setup, etc. This string is interpolated using
135 # environment variables would be setup, etc. This string is interpolated using
136 # the Itpl module in IPython.external. Basically, you can use ${n} for the
136 # the Itpl module in IPython.external. Basically, you can use ${n} for the
137 # number of engine and ${cluster_dir} for the cluster_dir.
137 # number of engine and ${cluster_dir} for the cluster_dir.
138 # c.PBSEngineSetLauncher.batch_template = """"""
138 # c.PBSEngineSetLauncher.batch_template = """"""
139
139
140 # The name of the instantiated batch script that will actually be used to
140 # The name of the instantiated batch script that will actually be used to
141 # submit the job. This will be written to the cluster directory.
141 # submit the job. This will be written to the cluster directory.
142 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
142 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
143
143
144 #-----------------------------------------------------------------------------
144 #-----------------------------------------------------------------------------
145 # Windows HPC Server 2008 launcher configuration
145 # Windows HPC Server 2008 launcher configuration
146 #-----------------------------------------------------------------------------
146 #-----------------------------------------------------------------------------
147
147
148 # c.IPControllerJob.job_name = 'IPController'
148 # c.IPControllerJob.job_name = 'IPController'
149 # c.IPControllerJob.is_exclusive = False
149 # c.IPControllerJob.is_exclusive = False
150 # c.IPControllerJob.username = 'USERDOMAIN\\USERNAME'
150 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
151 # c.IPControllerJob.priority = 'Highest'
151 # c.IPControllerJob.priority = 'Highest'
152 # c.IPControllerJob.requested_nodes = ''
152 # c.IPControllerJob.requested_nodes = ''
153 # c.IPControllerJob.project = 'MyProject'
153 # c.IPControllerJob.project = 'MyProject'
154
154
155 # c.IPControllerTask.task_name = 'IPController'
155 # c.IPControllerTask.task_name = 'IPController'
156 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
156 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
157 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
157 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
158 # c.IPControllerTask.environment_variables = {}
158 # c.IPControllerTask.environment_variables = {}
159
159
160 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
160 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
161 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
161 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
162
162
163
163
164 # c.IPEngineSetJob.job_name = 'IPEngineSet'
164 # c.IPEngineSetJob.job_name = 'IPEngineSet'
165 # c.IPEngineSetJob.is_exclusive = False
165 # c.IPEngineSetJob.is_exclusive = False
166 # c.IPEngineSetJob.username = 'USERDOMAIN\\USERNAME'
166 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
167 # c.IPEngineSetJob.priority = 'Highest'
167 # c.IPEngineSetJob.priority = 'Highest'
168 # c.IPEngineSetJob.requested_nodes = ''
168 # c.IPEngineSetJob.requested_nodes = ''
169 # c.IPEngineSetJob.project = 'MyProject'
169 # c.IPEngineSetJob.project = 'MyProject'
170
170
171 # c.IPEngineTask.task_name = 'IPEngine'
171 # c.IPEngineTask.task_name = 'IPEngine'
172 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
172 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
173 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
173 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
174 # c.IPEngineTask.environment_variables = {}
174 # c.IPEngineTask.environment_variables = {}
175
175
176 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
176 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
177 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
177 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
178
178
179
179
180
180
181
181
182
182
183
183
184
184
@@ -1,481 +1,475 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import shutil
21 import shutil
22 import sys
22 import sys
23
23
24 from twisted.python import log
24 from twisted.python import log
25
25
26 from IPython.core import release
26 from IPython.core import release
27 from IPython.config.loader import PyFileConfigLoader
27 from IPython.config.loader import PyFileConfigLoader
28 from IPython.core.application import Application
28 from IPython.core.application import Application
29 from IPython.core.component import Component
29 from IPython.core.component import Component
30 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
30 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
31 from IPython.utils.traitlets import Unicode, Bool
31 from IPython.utils.traitlets import Unicode, Bool
32 from IPython.utils import genutils
32 from IPython.utils import genutils
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Imports
35 # Imports
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38
38
39 class ClusterDirError(Exception):
39 class ClusterDirError(Exception):
40 pass
40 pass
41
41
42
42
43 class PIDFileError(Exception):
43 class PIDFileError(Exception):
44 pass
44 pass
45
45
46
46
47 class ClusterDir(Component):
47 class ClusterDir(Component):
48 """An object to manage the cluster directory and its resources.
48 """An object to manage the cluster directory and its resources.
49
49
50 The cluster directory is used by :command:`ipcontroller`,
50 The cluster directory is used by :command:`ipcontroller`,
51 :command:`ipcontroller` and :command:`ipcontroller` to manage the
51 :command:`ipcontroller` and :command:`ipcontroller` to manage the
52 configuration, logging and security of these applications.
52 configuration, logging and security of these applications.
53
53
54 This object knows how to find, create and manage these directories. This
54 This object knows how to find, create and manage these directories. This
55 should be used by any code that want's to handle cluster directories.
55 should be used by any code that want's to handle cluster directories.
56 """
56 """
57
57
58 security_dir_name = Unicode('security')
58 security_dir_name = Unicode('security')
59 log_dir_name = Unicode('log')
59 log_dir_name = Unicode('log')
60 pid_dir_name = Unicode('pid')
60 pid_dir_name = Unicode('pid')
61 security_dir = Unicode(u'')
61 security_dir = Unicode(u'')
62 log_dir = Unicode(u'')
62 log_dir = Unicode(u'')
63 pid_dir = Unicode(u'')
63 pid_dir = Unicode(u'')
64 location = Unicode(u'')
64 location = Unicode(u'')
65
65
66 def __init__(self, location):
66 def __init__(self, location):
67 super(ClusterDir, self).__init__(None)
67 super(ClusterDir, self).__init__(None)
68 self.location = location
68 self.location = location
69
69
70 def _location_changed(self, name, old, new):
70 def _location_changed(self, name, old, new):
71 if not os.path.isdir(new):
71 if not os.path.isdir(new):
72 os.makedirs(new, mode=0777)
72 os.makedirs(new)
73 else:
74 os.chmod(new, 0777)
75 self.security_dir = os.path.join(new, self.security_dir_name)
73 self.security_dir = os.path.join(new, self.security_dir_name)
76 self.log_dir = os.path.join(new, self.log_dir_name)
74 self.log_dir = os.path.join(new, self.log_dir_name)
77 self.pid_dir = os.path.join(new, self.pid_dir_name)
75 self.pid_dir = os.path.join(new, self.pid_dir_name)
78 self.check_dirs()
76 self.check_dirs()
79
77
80 def _log_dir_changed(self, name, old, new):
78 def _log_dir_changed(self, name, old, new):
81 self.check_log_dir()
79 self.check_log_dir()
82
80
83 def check_log_dir(self):
81 def check_log_dir(self):
84 if not os.path.isdir(self.log_dir):
82 if not os.path.isdir(self.log_dir):
85 os.mkdir(self.log_dir, 0777)
83 os.mkdir(self.log_dir)
86 else:
87 os.chmod(self.log_dir, 0777)
88
84
89 def _security_dir_changed(self, name, old, new):
85 def _security_dir_changed(self, name, old, new):
90 self.check_security_dir()
86 self.check_security_dir()
91
87
92 def check_security_dir(self):
88 def check_security_dir(self):
93 if not os.path.isdir(self.security_dir):
89 if not os.path.isdir(self.security_dir):
94 os.mkdir(self.security_dir, 0700)
90 os.mkdir(self.security_dir, 0700)
95 else:
91 os.chmod(self.security_dir, 0700)
96 os.chmod(self.security_dir, 0700)
97
92
98 def _pid_dir_changed(self, name, old, new):
93 def _pid_dir_changed(self, name, old, new):
99 self.check_pid_dir()
94 self.check_pid_dir()
100
95
101 def check_pid_dir(self):
96 def check_pid_dir(self):
102 if not os.path.isdir(self.pid_dir):
97 if not os.path.isdir(self.pid_dir):
103 os.mkdir(self.pid_dir, 0700)
98 os.mkdir(self.pid_dir, 0700)
104 else:
99 os.chmod(self.pid_dir, 0700)
105 os.chmod(self.pid_dir, 0700)
106
100
107 def check_dirs(self):
101 def check_dirs(self):
108 self.check_security_dir()
102 self.check_security_dir()
109 self.check_log_dir()
103 self.check_log_dir()
110 self.check_pid_dir()
104 self.check_pid_dir()
111
105
112 def load_config_file(self, filename):
106 def load_config_file(self, filename):
113 """Load a config file from the top level of the cluster dir.
107 """Load a config file from the top level of the cluster dir.
114
108
115 Parameters
109 Parameters
116 ----------
110 ----------
117 filename : unicode or str
111 filename : unicode or str
118 The filename only of the config file that must be located in
112 The filename only of the config file that must be located in
119 the top-level of the cluster directory.
113 the top-level of the cluster directory.
120 """
114 """
121 loader = PyFileConfigLoader(filename, self.location)
115 loader = PyFileConfigLoader(filename, self.location)
122 return loader.load_config()
116 return loader.load_config()
123
117
124 def copy_config_file(self, config_file, path=None, overwrite=False):
118 def copy_config_file(self, config_file, path=None, overwrite=False):
125 """Copy a default config file into the active cluster directory.
119 """Copy a default config file into the active cluster directory.
126
120
127 Default configuration files are kept in :mod:`IPython.config.default`.
121 Default configuration files are kept in :mod:`IPython.config.default`.
128 This function moves these from that location to the working cluster
122 This function moves these from that location to the working cluster
129 directory.
123 directory.
130 """
124 """
131 if path is None:
125 if path is None:
132 import IPython.config.default
126 import IPython.config.default
133 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
127 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
134 path = os.path.sep.join(path)
128 path = os.path.sep.join(path)
135 src = os.path.join(path, config_file)
129 src = os.path.join(path, config_file)
136 dst = os.path.join(self.location, config_file)
130 dst = os.path.join(self.location, config_file)
137 if not os.path.isfile(dst) or overwrite:
131 if not os.path.isfile(dst) or overwrite:
138 shutil.copy(src, dst)
132 shutil.copy(src, dst)
139
133
140 def copy_all_config_files(self, path=None, overwrite=False):
134 def copy_all_config_files(self, path=None, overwrite=False):
141 """Copy all config files into the active cluster directory."""
135 """Copy all config files into the active cluster directory."""
142 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
136 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
143 u'ipcluster_config.py']:
137 u'ipcluster_config.py']:
144 self.copy_config_file(f, path=path, overwrite=overwrite)
138 self.copy_config_file(f, path=path, overwrite=overwrite)
145
139
146 @classmethod
140 @classmethod
147 def create_cluster_dir(csl, cluster_dir):
141 def create_cluster_dir(csl, cluster_dir):
148 """Create a new cluster directory given a full path.
142 """Create a new cluster directory given a full path.
149
143
150 Parameters
144 Parameters
151 ----------
145 ----------
152 cluster_dir : str
146 cluster_dir : str
153 The full path to the cluster directory. If it does exist, it will
147 The full path to the cluster directory. If it does exist, it will
154 be used. If not, it will be created.
148 be used. If not, it will be created.
155 """
149 """
156 return ClusterDir(cluster_dir)
150 return ClusterDir(cluster_dir)
157
151
158 @classmethod
152 @classmethod
159 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
153 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
160 """Create a cluster dir by profile name and path.
154 """Create a cluster dir by profile name and path.
161
155
162 Parameters
156 Parameters
163 ----------
157 ----------
164 path : str
158 path : str
165 The path (directory) to put the cluster directory in.
159 The path (directory) to put the cluster directory in.
166 profile : str
160 profile : str
167 The name of the profile. The name of the cluster directory will
161 The name of the profile. The name of the cluster directory will
168 be "cluster_<profile>".
162 be "cluster_<profile>".
169 """
163 """
170 if not os.path.isdir(path):
164 if not os.path.isdir(path):
171 raise ClusterDirError('Directory not found: %s' % path)
165 raise ClusterDirError('Directory not found: %s' % path)
172 cluster_dir = os.path.join(path, u'cluster_' + profile)
166 cluster_dir = os.path.join(path, u'cluster_' + profile)
173 return ClusterDir(cluster_dir)
167 return ClusterDir(cluster_dir)
174
168
175 @classmethod
169 @classmethod
176 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
170 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
177 """Find an existing cluster dir by profile name, return its ClusterDir.
171 """Find an existing cluster dir by profile name, return its ClusterDir.
178
172
179 This searches through a sequence of paths for a cluster dir. If it
173 This searches through a sequence of paths for a cluster dir. If it
180 is not found, a :class:`ClusterDirError` exception will be raised.
174 is not found, a :class:`ClusterDirError` exception will be raised.
181
175
182 The search path algorithm is:
176 The search path algorithm is:
183 1. ``os.getcwd()``
177 1. ``os.getcwd()``
184 2. ``ipython_dir``
178 2. ``ipython_dir``
185 3. The directories found in the ":" separated
179 3. The directories found in the ":" separated
186 :env:`IPCLUSTER_DIR_PATH` environment variable.
180 :env:`IPCLUSTER_DIR_PATH` environment variable.
187
181
188 Parameters
182 Parameters
189 ----------
183 ----------
190 ipython_dir : unicode or str
184 ipython_dir : unicode or str
191 The IPython directory to use.
185 The IPython directory to use.
192 profile : unicode or str
186 profile : unicode or str
193 The name of the profile. The name of the cluster directory
187 The name of the profile. The name of the cluster directory
194 will be "cluster_<profile>".
188 will be "cluster_<profile>".
195 """
189 """
196 dirname = u'cluster_' + profile
190 dirname = u'cluster_' + profile
197 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
191 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
198 if cluster_dir_paths:
192 if cluster_dir_paths:
199 cluster_dir_paths = cluster_dir_paths.split(':')
193 cluster_dir_paths = cluster_dir_paths.split(':')
200 else:
194 else:
201 cluster_dir_paths = []
195 cluster_dir_paths = []
202 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
196 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
203 for p in paths:
197 for p in paths:
204 cluster_dir = os.path.join(p, dirname)
198 cluster_dir = os.path.join(p, dirname)
205 if os.path.isdir(cluster_dir):
199 if os.path.isdir(cluster_dir):
206 return ClusterDir(cluster_dir)
200 return ClusterDir(cluster_dir)
207 else:
201 else:
208 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
202 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
209
203
210 @classmethod
204 @classmethod
211 def find_cluster_dir(cls, cluster_dir):
205 def find_cluster_dir(cls, cluster_dir):
212 """Find/create a cluster dir and return its ClusterDir.
206 """Find/create a cluster dir and return its ClusterDir.
213
207
214 This will create the cluster directory if it doesn't exist.
208 This will create the cluster directory if it doesn't exist.
215
209
216 Parameters
210 Parameters
217 ----------
211 ----------
218 cluster_dir : unicode or str
212 cluster_dir : unicode or str
219 The path of the cluster directory. This is expanded using
213 The path of the cluster directory. This is expanded using
220 :func:`IPython.utils.genutils.expand_path`.
214 :func:`IPython.utils.genutils.expand_path`.
221 """
215 """
222 cluster_dir = genutils.expand_path(cluster_dir)
216 cluster_dir = genutils.expand_path(cluster_dir)
223 if not os.path.isdir(cluster_dir):
217 if not os.path.isdir(cluster_dir):
224 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
218 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
225 return ClusterDir(cluster_dir)
219 return ClusterDir(cluster_dir)
226
220
227
221
228 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
222 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
229 """Default command line options for IPython cluster applications."""
223 """Default command line options for IPython cluster applications."""
230
224
231 def _add_other_arguments(self):
225 def _add_other_arguments(self):
232 self.parser.add_argument('--ipython-dir',
226 self.parser.add_argument('--ipython-dir',
233 dest='Global.ipython_dir',type=unicode,
227 dest='Global.ipython_dir',type=unicode,
234 help='Set to override default location of Global.ipython_dir.',
228 help='Set to override default location of Global.ipython_dir.',
235 default=NoConfigDefault,
229 default=NoConfigDefault,
236 metavar='Global.ipython_dir'
230 metavar='Global.ipython_dir'
237 )
231 )
238 self.parser.add_argument('-p', '--profile',
232 self.parser.add_argument('-p', '--profile',
239 dest='Global.profile',type=unicode,
233 dest='Global.profile',type=unicode,
240 help='The string name of the profile to be used. This determines '
234 help='The string name of the profile to be used. This determines '
241 'the name of the cluster dir as: cluster_<profile>. The default profile '
235 'the name of the cluster dir as: cluster_<profile>. The default profile '
242 'is named "default". The cluster directory is resolve this way '
236 'is named "default". The cluster directory is resolve this way '
243 'if the --cluster-dir option is not used.',
237 'if the --cluster-dir option is not used.',
244 default=NoConfigDefault,
238 default=NoConfigDefault,
245 metavar='Global.profile'
239 metavar='Global.profile'
246 )
240 )
247 self.parser.add_argument('--log-level',
241 self.parser.add_argument('--log-level',
248 dest="Global.log_level",type=int,
242 dest="Global.log_level",type=int,
249 help='Set the log level (0,10,20,30,40,50). Default is 30.',
243 help='Set the log level (0,10,20,30,40,50). Default is 30.',
250 default=NoConfigDefault,
244 default=NoConfigDefault,
251 metavar="Global.log_level"
245 metavar="Global.log_level"
252 )
246 )
253 self.parser.add_argument('--cluster-dir',
247 self.parser.add_argument('--cluster-dir',
254 dest='Global.cluster_dir',type=unicode,
248 dest='Global.cluster_dir',type=unicode,
255 help='Set the cluster dir. This overrides the logic used by the '
249 help='Set the cluster dir. This overrides the logic used by the '
256 '--profile option.',
250 '--profile option.',
257 default=NoConfigDefault,
251 default=NoConfigDefault,
258 metavar='Global.cluster_dir'
252 metavar='Global.cluster_dir'
259 ),
253 ),
260 self.parser.add_argument('--work-dir',
254 self.parser.add_argument('--work-dir',
261 dest='Global.work_dir',type=unicode,
255 dest='Global.work_dir',type=unicode,
262 help='Set the working dir for the process.',
256 help='Set the working dir for the process.',
263 default=NoConfigDefault,
257 default=NoConfigDefault,
264 metavar='Global.work_dir'
258 metavar='Global.work_dir'
265 )
259 )
266 self.parser.add_argument('--clean-logs',
260 self.parser.add_argument('--clean-logs',
267 dest='Global.clean_logs', action='store_true',
261 dest='Global.clean_logs', action='store_true',
268 help='Delete old log flies before starting.',
262 help='Delete old log flies before starting.',
269 default=NoConfigDefault
263 default=NoConfigDefault
270 )
264 )
271 self.parser.add_argument('--no-clean-logs',
265 self.parser.add_argument('--no-clean-logs',
272 dest='Global.clean_logs', action='store_false',
266 dest='Global.clean_logs', action='store_false',
273 help="Don't Delete old log flies before starting.",
267 help="Don't Delete old log flies before starting.",
274 default=NoConfigDefault
268 default=NoConfigDefault
275 )
269 )
276
270
277 class ApplicationWithClusterDir(Application):
271 class ApplicationWithClusterDir(Application):
278 """An application that puts everything into a cluster directory.
272 """An application that puts everything into a cluster directory.
279
273
280 Instead of looking for things in the ipython_dir, this type of application
274 Instead of looking for things in the ipython_dir, this type of application
281 will use its own private directory called the "cluster directory"
275 will use its own private directory called the "cluster directory"
282 for things like config files, log files, etc.
276 for things like config files, log files, etc.
283
277
284 The cluster directory is resolved as follows:
278 The cluster directory is resolved as follows:
285
279
286 * If the ``--cluster-dir`` option is given, it is used.
280 * If the ``--cluster-dir`` option is given, it is used.
287 * If ``--cluster-dir`` is not given, the application directory is
281 * If ``--cluster-dir`` is not given, the application directory is
288 resolve using the profile name as ``cluster_<profile>``. The search
282 resolve using the profile name as ``cluster_<profile>``. The search
289 path for this directory is then i) cwd if it is found there
283 path for this directory is then i) cwd if it is found there
290 and ii) in ipython_dir otherwise.
284 and ii) in ipython_dir otherwise.
291
285
292 The config file for the application is to be put in the cluster
286 The config file for the application is to be put in the cluster
293 dir and named the value of the ``config_file_name`` class attribute.
287 dir and named the value of the ``config_file_name`` class attribute.
294 """
288 """
295
289
296 auto_create_cluster_dir = True
290 auto_create_cluster_dir = True
297
291
298 def create_default_config(self):
292 def create_default_config(self):
299 super(ApplicationWithClusterDir, self).create_default_config()
293 super(ApplicationWithClusterDir, self).create_default_config()
300 self.default_config.Global.profile = u'default'
294 self.default_config.Global.profile = u'default'
301 self.default_config.Global.cluster_dir = u''
295 self.default_config.Global.cluster_dir = u''
302 self.default_config.Global.work_dir = os.getcwd()
296 self.default_config.Global.work_dir = os.getcwd()
303 self.default_config.Global.log_to_file = False
297 self.default_config.Global.log_to_file = False
304 self.default_config.Global.clean_logs = False
298 self.default_config.Global.clean_logs = False
305
299
306 def create_command_line_config(self):
300 def create_command_line_config(self):
307 """Create and return a command line config loader."""
301 """Create and return a command line config loader."""
308 return AppWithClusterDirArgParseConfigLoader(
302 return AppWithClusterDirArgParseConfigLoader(
309 description=self.description,
303 description=self.description,
310 version=release.version
304 version=release.version
311 )
305 )
312
306
313 def find_resources(self):
307 def find_resources(self):
314 """This resolves the cluster directory.
308 """This resolves the cluster directory.
315
309
316 This tries to find the cluster directory and if successful, it will
310 This tries to find the cluster directory and if successful, it will
317 have done:
311 have done:
318 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
312 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
319 the application.
313 the application.
320 * Sets ``self.cluster_dir`` attribute of the application and config
314 * Sets ``self.cluster_dir`` attribute of the application and config
321 objects.
315 objects.
322
316
323 The algorithm used for this is as follows:
317 The algorithm used for this is as follows:
324 1. Try ``Global.cluster_dir``.
318 1. Try ``Global.cluster_dir``.
325 2. Try using ``Global.profile``.
319 2. Try using ``Global.profile``.
326 3. If both of these fail and ``self.auto_create_cluster_dir`` is
320 3. If both of these fail and ``self.auto_create_cluster_dir`` is
327 ``True``, then create the new cluster dir in the IPython directory.
321 ``True``, then create the new cluster dir in the IPython directory.
328 4. If all fails, then raise :class:`ClusterDirError`.
322 4. If all fails, then raise :class:`ClusterDirError`.
329 """
323 """
330
324
331 try:
325 try:
332 cluster_dir = self.command_line_config.Global.cluster_dir
326 cluster_dir = self.command_line_config.Global.cluster_dir
333 except AttributeError:
327 except AttributeError:
334 cluster_dir = self.default_config.Global.cluster_dir
328 cluster_dir = self.default_config.Global.cluster_dir
335 cluster_dir = genutils.expand_path(cluster_dir)
329 cluster_dir = genutils.expand_path(cluster_dir)
336 try:
330 try:
337 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
331 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
338 except ClusterDirError:
332 except ClusterDirError:
339 pass
333 pass
340 else:
334 else:
341 self.log.info('Using existing cluster dir: %s' % \
335 self.log.info('Using existing cluster dir: %s' % \
342 self.cluster_dir_obj.location
336 self.cluster_dir_obj.location
343 )
337 )
344 self.finish_cluster_dir()
338 self.finish_cluster_dir()
345 return
339 return
346
340
347 try:
341 try:
348 self.profile = self.command_line_config.Global.profile
342 self.profile = self.command_line_config.Global.profile
349 except AttributeError:
343 except AttributeError:
350 self.profile = self.default_config.Global.profile
344 self.profile = self.default_config.Global.profile
351 try:
345 try:
352 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
346 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
353 self.ipython_dir, self.profile)
347 self.ipython_dir, self.profile)
354 except ClusterDirError:
348 except ClusterDirError:
355 pass
349 pass
356 else:
350 else:
357 self.log.info('Using existing cluster dir: %s' % \
351 self.log.info('Using existing cluster dir: %s' % \
358 self.cluster_dir_obj.location
352 self.cluster_dir_obj.location
359 )
353 )
360 self.finish_cluster_dir()
354 self.finish_cluster_dir()
361 return
355 return
362
356
363 if self.auto_create_cluster_dir:
357 if self.auto_create_cluster_dir:
364 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
358 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
365 self.ipython_dir, self.profile
359 self.ipython_dir, self.profile
366 )
360 )
367 self.log.info('Creating new cluster dir: %s' % \
361 self.log.info('Creating new cluster dir: %s' % \
368 self.cluster_dir_obj.location
362 self.cluster_dir_obj.location
369 )
363 )
370 self.finish_cluster_dir()
364 self.finish_cluster_dir()
371 else:
365 else:
372 raise ClusterDirError('Could not find a valid cluster directory.')
366 raise ClusterDirError('Could not find a valid cluster directory.')
373
367
374 def finish_cluster_dir(self):
368 def finish_cluster_dir(self):
375 # Set the cluster directory
369 # Set the cluster directory
376 self.cluster_dir = self.cluster_dir_obj.location
370 self.cluster_dir = self.cluster_dir_obj.location
377
371
378 # These have to be set because they could be different from the one
372 # These have to be set because they could be different from the one
379 # that we just computed. Because command line has the highest
373 # that we just computed. Because command line has the highest
380 # priority, this will always end up in the master_config.
374 # priority, this will always end up in the master_config.
381 self.default_config.Global.cluster_dir = self.cluster_dir
375 self.default_config.Global.cluster_dir = self.cluster_dir
382 self.command_line_config.Global.cluster_dir = self.cluster_dir
376 self.command_line_config.Global.cluster_dir = self.cluster_dir
383
377
384 # Set the search path to the cluster directory
378 # Set the search path to the cluster directory
385 self.config_file_paths = (self.cluster_dir,)
379 self.config_file_paths = (self.cluster_dir,)
386
380
387 def find_config_file_name(self):
381 def find_config_file_name(self):
388 """Find the config file name for this application."""
382 """Find the config file name for this application."""
389 # For this type of Application it should be set as a class attribute.
383 # For this type of Application it should be set as a class attribute.
390 if not hasattr(self, 'config_file_name'):
384 if not hasattr(self, 'config_file_name'):
391 self.log.critical("No config filename found")
385 self.log.critical("No config filename found")
392
386
393 def find_config_file_paths(self):
387 def find_config_file_paths(self):
394 # Set the search path to the cluster directory
388 # Set the search path to the cluster directory
395 self.config_file_paths = (self.cluster_dir,)
389 self.config_file_paths = (self.cluster_dir,)
396
390
397 def pre_construct(self):
391 def pre_construct(self):
398 # The log and security dirs were set earlier, but here we put them
392 # The log and security dirs were set earlier, but here we put them
399 # into the config and log them.
393 # into the config and log them.
400 config = self.master_config
394 config = self.master_config
401 sdir = self.cluster_dir_obj.security_dir
395 sdir = self.cluster_dir_obj.security_dir
402 self.security_dir = config.Global.security_dir = sdir
396 self.security_dir = config.Global.security_dir = sdir
403 ldir = self.cluster_dir_obj.log_dir
397 ldir = self.cluster_dir_obj.log_dir
404 self.log_dir = config.Global.log_dir = ldir
398 self.log_dir = config.Global.log_dir = ldir
405 pdir = self.cluster_dir_obj.pid_dir
399 pdir = self.cluster_dir_obj.pid_dir
406 self.pid_dir = config.Global.pid_dir = pdir
400 self.pid_dir = config.Global.pid_dir = pdir
407 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
401 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
408 config.Global.work_dir = unicode(genutils.expand_path(config.Global.work_dir))
402 config.Global.work_dir = unicode(genutils.expand_path(config.Global.work_dir))
409 # Change to the working directory. We do this just before construct
403 # Change to the working directory. We do this just before construct
410 # is called so all the components there have the right working dir.
404 # is called so all the components there have the right working dir.
411 self.to_work_dir()
405 self.to_work_dir()
412
406
413 def to_work_dir(self):
407 def to_work_dir(self):
414 wd = self.master_config.Global.work_dir
408 wd = self.master_config.Global.work_dir
415 if unicode(wd) != unicode(os.getcwd()):
409 if unicode(wd) != unicode(os.getcwd()):
416 os.chdir(wd)
410 os.chdir(wd)
417 self.log.info("Changing to working dir: %s" % wd)
411 self.log.info("Changing to working dir: %s" % wd)
418
412
419 def start_logging(self):
413 def start_logging(self):
420 # Remove old log files
414 # Remove old log files
421 if self.master_config.Global.clean_logs:
415 if self.master_config.Global.clean_logs:
422 log_dir = self.master_config.Global.log_dir
416 log_dir = self.master_config.Global.log_dir
423 for f in os.listdir(log_dir):
417 for f in os.listdir(log_dir):
424 if f.startswith(self.name + u'-') and f.endswith('.log'):
418 if f.startswith(self.name + u'-') and f.endswith('.log'):
425 os.remove(os.path.join(log_dir, f))
419 os.remove(os.path.join(log_dir, f))
426 # Start logging to the new log file
420 # Start logging to the new log file
427 if self.master_config.Global.log_to_file:
421 if self.master_config.Global.log_to_file:
428 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
422 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
429 logfile = os.path.join(self.log_dir, log_filename)
423 logfile = os.path.join(self.log_dir, log_filename)
430 open_log_file = open(logfile, 'w')
424 open_log_file = open(logfile, 'w')
431 else:
425 else:
432 open_log_file = sys.stdout
426 open_log_file = sys.stdout
433 log.startLogging(open_log_file)
427 log.startLogging(open_log_file)
434
428
435 def write_pid_file(self, overwrite=False):
429 def write_pid_file(self, overwrite=False):
436 """Create a .pid file in the pid_dir with my pid.
430 """Create a .pid file in the pid_dir with my pid.
437
431
438 This must be called after pre_construct, which sets `self.pid_dir`.
432 This must be called after pre_construct, which sets `self.pid_dir`.
439 This raises :exc:`PIDFileError` if the pid file exists already.
433 This raises :exc:`PIDFileError` if the pid file exists already.
440 """
434 """
441 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
435 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
442 if os.path.isfile(pid_file):
436 if os.path.isfile(pid_file):
443 pid = self.get_pid_from_file()
437 pid = self.get_pid_from_file()
444 if not overwrite:
438 if not overwrite:
445 raise PIDFileError(
439 raise PIDFileError(
446 'The pid file [%s] already exists. \nThis could mean that this '
440 'The pid file [%s] already exists. \nThis could mean that this '
447 'server is already running with [pid=%s].' % (pid_file, pid)
441 'server is already running with [pid=%s].' % (pid_file, pid)
448 )
442 )
449 with open(pid_file, 'w') as f:
443 with open(pid_file, 'w') as f:
450 self.log.info("Creating pid file: %s" % pid_file)
444 self.log.info("Creating pid file: %s" % pid_file)
451 f.write(repr(os.getpid())+'\n')
445 f.write(repr(os.getpid())+'\n')
452
446
453 def remove_pid_file(self):
447 def remove_pid_file(self):
454 """Remove the pid file.
448 """Remove the pid file.
455
449
456 This should be called at shutdown by registering a callback with
450 This should be called at shutdown by registering a callback with
457 :func:`reactor.addSystemEventTrigger`. This needs to return
451 :func:`reactor.addSystemEventTrigger`. This needs to return
458 ``None``.
452 ``None``.
459 """
453 """
460 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
454 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
461 if os.path.isfile(pid_file):
455 if os.path.isfile(pid_file):
462 try:
456 try:
463 self.log.info("Removing pid file: %s" % pid_file)
457 self.log.info("Removing pid file: %s" % pid_file)
464 os.remove(pid_file)
458 os.remove(pid_file)
465 except:
459 except:
466 self.log.warn("Error removing the pid file: %s" % pid_file)
460 self.log.warn("Error removing the pid file: %s" % pid_file)
467
461
468 def get_pid_from_file(self):
462 def get_pid_from_file(self):
469 """Get the pid from the pid file.
463 """Get the pid from the pid file.
470
464
471 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
465 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
472 """
466 """
473 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
467 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
474 if os.path.isfile(pid_file):
468 if os.path.isfile(pid_file):
475 with open(pid_file, 'r') as f:
469 with open(pid_file, 'r') as f:
476 pid = int(f.read().strip())
470 pid = int(f.read().strip())
477 return pid
471 return pid
478 else:
472 else:
479 raise PIDFileError('pid file not found: %s' % pid_file)
473 raise PIDFileError('pid file not found: %s' % pid_file)
480
474
481
475
@@ -1,455 +1,459 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import logging
18 import logging
19 import os
19 import os
20 import signal
20 import signal
21 import sys
21 import sys
22
22
23 if os.name=='posix':
23 if os.name=='posix':
24 from twisted.scripts._twistd_unix import daemonize
24 from twisted.scripts._twistd_unix import daemonize
25
25
26 from IPython.core import release
26 from IPython.core import release
27 from IPython.external import argparse
27 from IPython.external import argparse
28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
29 from IPython.utils.importstring import import_item
29 from IPython.utils.importstring import import_item
30
30
31 from IPython.kernel.clusterdir import (
31 from IPython.kernel.clusterdir import (
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 )
33 )
34
34
35 from twisted.internet import reactor, defer
35 from twisted.internet import reactor, defer
36 from twisted.python import log, failure
36 from twisted.python import log, failure
37
37
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # The ipcluster application
40 # The ipcluster application
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43
43
44 # Exit codes for ipcluster
44 # Exit codes for ipcluster
45
45
46 # This will be the exit code if the ipcluster appears to be running because
46 # This will be the exit code if the ipcluster appears to be running because
47 # a .pid file exists
47 # a .pid file exists
48 ALREADY_STARTED = 10
48 ALREADY_STARTED = 10
49
49
50 # This will be the exit code if ipcluster stop is run, but there is not .pid
50 # This will be the exit code if ipcluster stop is run, but there is not .pid
51 # file to be found.
51 # file to be found.
52 ALREADY_STOPPED = 11
52 ALREADY_STOPPED = 11
53
53
54
54
55 class IPClusterCLLoader(ArgParseConfigLoader):
55 class IPClusterCLLoader(ArgParseConfigLoader):
56
56
57 def _add_arguments(self):
57 def _add_arguments(self):
58 # This has all the common options that all subcommands use
58 # This has all the common options that all subcommands use
59 parent_parser1 = argparse.ArgumentParser(add_help=False)
59 parent_parser1 = argparse.ArgumentParser(add_help=False)
60 parent_parser1.add_argument('--ipython-dir',
60 parent_parser1.add_argument('--ipython-dir',
61 dest='Global.ipython_dir',type=unicode,
61 dest='Global.ipython_dir',type=unicode,
62 help='Set to override default location of Global.ipython_dir.',
62 help='Set to override default location of Global.ipython_dir.',
63 default=NoConfigDefault,
63 default=NoConfigDefault,
64 metavar='Global.ipython_dir')
64 metavar='Global.ipython_dir')
65 parent_parser1.add_argument('--log-level',
65 parent_parser1.add_argument('--log-level',
66 dest="Global.log_level",type=int,
66 dest="Global.log_level",type=int,
67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
68 default=NoConfigDefault,
68 default=NoConfigDefault,
69 metavar='Global.log_level')
69 metavar='Global.log_level')
70
70
71 # This has all the common options that other subcommands use
71 # This has all the common options that other subcommands use
72 parent_parser2 = argparse.ArgumentParser(add_help=False)
72 parent_parser2 = argparse.ArgumentParser(add_help=False)
73 parent_parser2.add_argument('-p','--profile',
73 parent_parser2.add_argument('-p','--profile',
74 dest='Global.profile',type=unicode,
74 dest='Global.profile',type=unicode,
75 help='The string name of the profile to be used. This determines '
75 help='The string name of the profile to be used. This determines '
76 'the name of the cluster dir as: cluster_<profile>. The default profile '
76 'the name of the cluster dir as: cluster_<profile>. The default profile '
77 'is named "default". The cluster directory is resolve this way '
77 'is named "default". The cluster directory is resolve this way '
78 'if the --cluster-dir option is not used.',
78 'if the --cluster-dir option is not used.',
79 default=NoConfigDefault,
79 default=NoConfigDefault,
80 metavar='Global.profile')
80 metavar='Global.profile')
81 parent_parser2.add_argument('--cluster-dir',
81 parent_parser2.add_argument('--cluster-dir',
82 dest='Global.cluster_dir',type=unicode,
82 dest='Global.cluster_dir',type=unicode,
83 help='Set the cluster dir. This overrides the logic used by the '
83 help='Set the cluster dir. This overrides the logic used by the '
84 '--profile option.',
84 '--profile option.',
85 default=NoConfigDefault,
85 default=NoConfigDefault,
86 metavar='Global.cluster_dir'),
86 metavar='Global.cluster_dir'),
87 parent_parser2.add_argument('--work-dir',
87 parent_parser2.add_argument('--work-dir',
88 dest='Global.work_dir',type=unicode,
88 dest='Global.work_dir',type=unicode,
89 help='Set the working dir for the process.',
89 help='Set the working dir for the process.',
90 default=NoConfigDefault,
90 default=NoConfigDefault,
91 metavar='Global.work_dir')
91 metavar='Global.work_dir')
92 parent_parser2.add_argument('--log-to-file',
92 parent_parser2.add_argument('--log-to-file',
93 action='store_true', dest='Global.log_to_file',
93 action='store_true', dest='Global.log_to_file',
94 default=NoConfigDefault,
94 default=NoConfigDefault,
95 help='Log to a file in the log directory (default is stdout)'
95 help='Log to a file in the log directory (default is stdout)'
96 )
96 )
97
97
98 subparsers = self.parser.add_subparsers(
98 subparsers = self.parser.add_subparsers(
99 dest='Global.subcommand',
99 dest='Global.subcommand',
100 title='ipcluster subcommands',
100 title='ipcluster subcommands',
101 description='ipcluster has a variety of subcommands. '
101 description='ipcluster has a variety of subcommands. '
102 'The general way of running ipcluster is "ipcluster <cmd> '
102 'The general way of running ipcluster is "ipcluster <cmd> '
103 ' [options]""',
103 ' [options]""',
104 help='For more help, type "ipcluster <cmd> -h"')
104 help='For more help, type "ipcluster <cmd> -h"')
105
105
106 parser_list = subparsers.add_parser(
106 parser_list = subparsers.add_parser(
107 'list',
107 'list',
108 help='List all clusters in cwd and ipython_dir.',
108 help='List all clusters in cwd and ipython_dir.',
109 parents=[parent_parser1]
109 parents=[parent_parser1]
110 )
110 )
111
111
112 parser_create = subparsers.add_parser(
112 parser_create = subparsers.add_parser(
113 'create',
113 'create',
114 help='Create a new cluster directory.',
114 help='Create a new cluster directory.',
115 parents=[parent_parser1, parent_parser2]
115 parents=[parent_parser1, parent_parser2]
116 )
116 )
117 parser_create.add_argument(
117 parser_create.add_argument(
118 '--reset-config',
118 '--reset-config',
119 dest='Global.reset_config', action='store_true',
119 dest='Global.reset_config', action='store_true',
120 default=NoConfigDefault,
120 default=NoConfigDefault,
121 help='Recopy the default config files to the cluster directory. '
121 help='Recopy the default config files to the cluster directory. '
122 'You will loose any modifications you have made to these files.'
122 'You will loose any modifications you have made to these files.'
123 )
123 )
124
124
125 parser_start = subparsers.add_parser(
125 parser_start = subparsers.add_parser(
126 'start',
126 'start',
127 help='Start a cluster.',
127 help='Start a cluster.',
128 parents=[parent_parser1, parent_parser2]
128 parents=[parent_parser1, parent_parser2]
129 )
129 )
130 parser_start.add_argument(
130 parser_start.add_argument(
131 '-n', '--number',
131 '-n', '--number',
132 type=int, dest='Global.n',
132 type=int, dest='Global.n',
133 default=NoConfigDefault,
133 default=NoConfigDefault,
134 help='The number of engines to start.',
134 help='The number of engines to start.',
135 metavar='Global.n'
135 metavar='Global.n'
136 )
136 )
137 parser_start.add_argument('--clean-logs',
137 parser_start.add_argument('--clean-logs',
138 dest='Global.clean_logs', action='store_true',
138 dest='Global.clean_logs', action='store_true',
139 help='Delete old log flies before starting.',
139 help='Delete old log flies before starting.',
140 default=NoConfigDefault
140 default=NoConfigDefault
141 )
141 )
142 parser_start.add_argument('--no-clean-logs',
142 parser_start.add_argument('--no-clean-logs',
143 dest='Global.clean_logs', action='store_false',
143 dest='Global.clean_logs', action='store_false',
144 help="Don't delete old log flies before starting.",
144 help="Don't delete old log flies before starting.",
145 default=NoConfigDefault
145 default=NoConfigDefault
146 )
146 )
147 parser_start.add_argument('--daemon',
147 parser_start.add_argument('--daemon',
148 dest='Global.daemonize', action='store_true',
148 dest='Global.daemonize', action='store_true',
149 help='Daemonize the ipcluster program. This implies --log-to-file',
149 help='Daemonize the ipcluster program. This implies --log-to-file',
150 default=NoConfigDefault
150 default=NoConfigDefault
151 )
151 )
152 parser_start.add_argument('--no-daemon',
152 parser_start.add_argument('--no-daemon',
153 dest='Global.daemonize', action='store_false',
153 dest='Global.daemonize', action='store_false',
154 help="Dont't daemonize the ipcluster program.",
154 help="Dont't daemonize the ipcluster program.",
155 default=NoConfigDefault
155 default=NoConfigDefault
156 )
156 )
157
157
158 parser_start = subparsers.add_parser(
158 parser_start = subparsers.add_parser(
159 'stop',
159 'stop',
160 help='Stop a cluster.',
160 help='Stop a cluster.',
161 parents=[parent_parser1, parent_parser2]
161 parents=[parent_parser1, parent_parser2]
162 )
162 )
163 parser_start.add_argument('--signal',
163 parser_start.add_argument('--signal',
164 dest='Global.signal', type=int,
164 dest='Global.signal', type=int,
165 help="The signal number to use in stopping the cluster (default=2).",
165 help="The signal number to use in stopping the cluster (default=2).",
166 metavar="Global.signal",
166 metavar="Global.signal",
167 default=NoConfigDefault
167 default=NoConfigDefault
168 )
168 )
169
169
170
170
171 default_config_file_name = u'ipcluster_config.py'
171 default_config_file_name = u'ipcluster_config.py'
172
172
173
173
174 class IPClusterApp(ApplicationWithClusterDir):
174 class IPClusterApp(ApplicationWithClusterDir):
175
175
176 name = u'ipcluster'
176 name = u'ipcluster'
177 description = 'Start an IPython cluster (controller and engines).'
177 description = 'Start an IPython cluster (controller and engines).'
178 config_file_name = default_config_file_name
178 config_file_name = default_config_file_name
179 default_log_level = logging.INFO
179 default_log_level = logging.INFO
180 auto_create_cluster_dir = False
180 auto_create_cluster_dir = False
181
181
182 def create_default_config(self):
182 def create_default_config(self):
183 super(IPClusterApp, self).create_default_config()
183 super(IPClusterApp, self).create_default_config()
184 self.default_config.Global.controller_launcher = \
184 self.default_config.Global.controller_launcher = \
185 'IPython.kernel.launcher.LocalControllerLauncher'
185 'IPython.kernel.launcher.LocalControllerLauncher'
186 self.default_config.Global.engine_launcher = \
186 self.default_config.Global.engine_launcher = \
187 'IPython.kernel.launcher.LocalEngineSetLauncher'
187 'IPython.kernel.launcher.LocalEngineSetLauncher'
188 self.default_config.Global.n = 2
188 self.default_config.Global.n = 2
189 self.default_config.Global.reset_config = False
189 self.default_config.Global.reset_config = False
190 self.default_config.Global.clean_logs = True
190 self.default_config.Global.clean_logs = True
191 self.default_config.Global.signal = 2
191 self.default_config.Global.signal = 2
192 self.default_config.Global.daemonize = False
192 self.default_config.Global.daemonize = False
193
193
194 def create_command_line_config(self):
194 def create_command_line_config(self):
195 """Create and return a command line config loader."""
195 """Create and return a command line config loader."""
196 return IPClusterCLLoader(
196 return IPClusterCLLoader(
197 description=self.description,
197 description=self.description,
198 version=release.version
198 version=release.version
199 )
199 )
200
200
201 def find_resources(self):
201 def find_resources(self):
202 subcommand = self.command_line_config.Global.subcommand
202 subcommand = self.command_line_config.Global.subcommand
203 if subcommand=='list':
203 if subcommand=='list':
204 self.list_cluster_dirs()
204 self.list_cluster_dirs()
205 # Exit immediately because there is nothing left to do.
205 # Exit immediately because there is nothing left to do.
206 self.exit()
206 self.exit()
207 elif subcommand=='create':
207 elif subcommand=='create':
208 self.auto_create_cluster_dir = True
208 self.auto_create_cluster_dir = True
209 super(IPClusterApp, self).find_resources()
209 super(IPClusterApp, self).find_resources()
210 elif subcommand=='start' or subcommand=='stop':
210 elif subcommand=='start' or subcommand=='stop':
211 self.auto_create_cluster_dir = False
211 self.auto_create_cluster_dir = True
212 try:
212 try:
213 super(IPClusterApp, self).find_resources()
213 super(IPClusterApp, self).find_resources()
214 except ClusterDirError:
214 except ClusterDirError:
215 raise ClusterDirError(
215 raise ClusterDirError(
216 "Could not find a cluster directory. A cluster dir must "
216 "Could not find a cluster directory. A cluster dir must "
217 "be created before running 'ipcluster start'. Do "
217 "be created before running 'ipcluster start'. Do "
218 "'ipcluster create -h' or 'ipcluster list -h' for more "
218 "'ipcluster create -h' or 'ipcluster list -h' for more "
219 "information about creating and listing cluster dirs."
219 "information about creating and listing cluster dirs."
220 )
220 )
221
221
222 def list_cluster_dirs(self):
222 def list_cluster_dirs(self):
223 # Find the search paths
223 # Find the search paths
224 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
224 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
225 if cluster_dir_paths:
225 if cluster_dir_paths:
226 cluster_dir_paths = cluster_dir_paths.split(':')
226 cluster_dir_paths = cluster_dir_paths.split(':')
227 else:
227 else:
228 cluster_dir_paths = []
228 cluster_dir_paths = []
229 try:
229 try:
230 ipython_dir = self.command_line_config.Global.ipython_dir
230 ipython_dir = self.command_line_config.Global.ipython_dir
231 except AttributeError:
231 except AttributeError:
232 ipython_dir = self.default_config.Global.ipython_dir
232 ipython_dir = self.default_config.Global.ipython_dir
233 paths = [os.getcwd(), ipython_dir] + \
233 paths = [os.getcwd(), ipython_dir] + \
234 cluster_dir_paths
234 cluster_dir_paths
235 paths = list(set(paths))
235 paths = list(set(paths))
236
236
237 self.log.info('Searching for cluster dirs in paths: %r' % paths)
237 self.log.info('Searching for cluster dirs in paths: %r' % paths)
238 for path in paths:
238 for path in paths:
239 files = os.listdir(path)
239 files = os.listdir(path)
240 for f in files:
240 for f in files:
241 full_path = os.path.join(path, f)
241 full_path = os.path.join(path, f)
242 if os.path.isdir(full_path) and f.startswith('cluster_'):
242 if os.path.isdir(full_path) and f.startswith('cluster_'):
243 profile = full_path.split('_')[-1]
243 profile = full_path.split('_')[-1]
244 start_cmd = 'ipcluster start -p %s -n 4' % profile
244 start_cmd = 'ipcluster start -p %s -n 4' % profile
245 print start_cmd + " ==> " + full_path
245 print start_cmd + " ==> " + full_path
246
246
247 def pre_construct(self):
247 def pre_construct(self):
248 # IPClusterApp.pre_construct() is where we cd to the working directory.
248 # IPClusterApp.pre_construct() is where we cd to the working directory.
249 super(IPClusterApp, self).pre_construct()
249 super(IPClusterApp, self).pre_construct()
250 config = self.master_config
250 config = self.master_config
251 try:
251 try:
252 daemon = config.Global.daemonize
252 daemon = config.Global.daemonize
253 if daemon:
253 if daemon:
254 config.Global.log_to_file = True
254 config.Global.log_to_file = True
255 except AttributeError:
255 except AttributeError:
256 pass
256 pass
257
257
258 def construct(self):
258 def construct(self):
259 config = self.master_config
259 config = self.master_config
260 if config.Global.subcommand=='list':
260 subcmd = config.Global.subcommand
261 pass
261 reset = config.Global.reset_config
262 elif config.Global.subcommand=='create':
262 if subcmd == 'list':
263 return
264 if subcmd == 'create':
263 self.log.info('Copying default config files to cluster directory '
265 self.log.info('Copying default config files to cluster directory '
264 '[overwrite=%r]' % (config.Global.reset_config,))
266 '[overwrite=%r]' % (reset,))
265 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
267 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
266 elif config.Global.subcommand=='start':
268 if subcmd =='start':
269 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
267 self.start_logging()
270 self.start_logging()
268 reactor.callWhenRunning(self.start_launchers)
271 reactor.callWhenRunning(self.start_launchers)
269
272
270 def start_launchers(self):
273 def start_launchers(self):
271 config = self.master_config
274 config = self.master_config
272
275
273 # Create the launchers. In both bases, we set the work_dir of
276 # Create the launchers. In both bases, we set the work_dir of
274 # the launcher to the cluster_dir. This is where the launcher's
277 # the launcher to the cluster_dir. This is where the launcher's
275 # subprocesses will be launched. It is not where the controller
278 # subprocesses will be launched. It is not where the controller
276 # and engine will be launched.
279 # and engine will be launched.
277 el_class = import_item(config.Global.engine_launcher)
280 el_class = import_item(config.Global.engine_launcher)
278 self.engine_launcher = el_class(
281 self.engine_launcher = el_class(
279 work_dir=self.cluster_dir, config=config
282 work_dir=self.cluster_dir, config=config
280 )
283 )
281 cl_class = import_item(config.Global.controller_launcher)
284 cl_class = import_item(config.Global.controller_launcher)
282 self.controller_launcher = cl_class(
285 self.controller_launcher = cl_class(
283 work_dir=self.cluster_dir, config=config
286 work_dir=self.cluster_dir, config=config
284 )
287 )
285
288
286 # Setup signals
289 # Setup signals
287 signal.signal(signal.SIGINT, self.sigint_handler)
290 signal.signal(signal.SIGINT, self.sigint_handler)
288
291
289 # Setup the observing of stopping. If the controller dies, shut
292 # Setup the observing of stopping. If the controller dies, shut
290 # everything down as that will be completely fatal for the engines.
293 # everything down as that will be completely fatal for the engines.
291 d1 = self.controller_launcher.observe_stop()
294 d1 = self.controller_launcher.observe_stop()
292 d1.addCallback(self.stop_launchers)
295 d1.addCallback(self.stop_launchers)
293 # But, we don't monitor the stopping of engines. An engine dying
296 # But, we don't monitor the stopping of engines. An engine dying
294 # is just fine and in principle a user could start a new engine.
297 # is just fine and in principle a user could start a new engine.
295 # Also, if we did monitor engine stopping, it is difficult to
298 # Also, if we did monitor engine stopping, it is difficult to
296 # know what to do when only some engines die. Currently, the
299 # know what to do when only some engines die. Currently, the
297 # observing of engine stopping is inconsistent. Some launchers
300 # observing of engine stopping is inconsistent. Some launchers
298 # might trigger on a single engine stopping, other wait until
301 # might trigger on a single engine stopping, other wait until
299 # all stop. TODO: think more about how to handle this.
302 # all stop. TODO: think more about how to handle this.
300
303
301 # Start the controller and engines
304 # Start the controller and engines
302 self._stopping = False # Make sure stop_launchers is not called 2x.
305 self._stopping = False # Make sure stop_launchers is not called 2x.
303 d = self.start_controller()
306 d = self.start_controller()
304 d.addCallback(self.start_engines)
307 d.addCallback(self.start_engines)
305 d.addCallback(self.startup_message)
308 d.addCallback(self.startup_message)
306 # If the controller or engines fail to start, stop everything
309 # If the controller or engines fail to start, stop everything
307 d.addErrback(self.stop_launchers)
310 d.addErrback(self.stop_launchers)
308 return d
311 return d
309
312
310 def startup_message(self, r=None):
313 def startup_message(self, r=None):
311 log.msg("IPython cluster: started")
314 log.msg("IPython cluster: started")
312 return r
315 return r
313
316
314 def start_controller(self, r=None):
317 def start_controller(self, r=None):
315 # log.msg("In start_controller")
318 # log.msg("In start_controller")
316 config = self.master_config
319 config = self.master_config
317 d = self.controller_launcher.start(
320 d = self.controller_launcher.start(
318 cluster_dir=config.Global.cluster_dir
321 cluster_dir=config.Global.cluster_dir
319 )
322 )
320 return d
323 return d
321
324
322 def start_engines(self, r=None):
325 def start_engines(self, r=None):
323 # log.msg("In start_engines")
326 # log.msg("In start_engines")
324 config = self.master_config
327 config = self.master_config
325 d = self.engine_launcher.start(
328 d = self.engine_launcher.start(
326 config.Global.n,
329 config.Global.n,
327 cluster_dir=config.Global.cluster_dir
330 cluster_dir=config.Global.cluster_dir
328 )
331 )
329 return d
332 return d
330
333
331 def stop_controller(self, r=None):
334 def stop_controller(self, r=None):
332 # log.msg("In stop_controller")
335 # log.msg("In stop_controller")
333 if self.controller_launcher.running:
336 if self.controller_launcher.running:
334 d = self.controller_launcher.stop()
337 d = self.controller_launcher.stop()
335 d.addErrback(self.log_err)
338 d.addErrback(self.log_err)
336 return d
339 return d
337 else:
340 else:
338 return defer.succeed(None)
341 return defer.succeed(None)
339
342
340 def stop_engines(self, r=None):
343 def stop_engines(self, r=None):
341 # log.msg("In stop_engines")
344 # log.msg("In stop_engines")
342 if self.engine_launcher.running:
345 if self.engine_launcher.running:
343 d = self.engine_launcher.stop()
346 d = self.engine_launcher.stop()
344 d.addErrback(self.log_err)
347 d.addErrback(self.log_err)
345 return d
348 return d
346 else:
349 else:
347 return defer.succeed(None)
350 return defer.succeed(None)
348
351
349 def log_err(self, f):
352 def log_err(self, f):
350 log.msg(f.getTraceback())
353 log.msg(f.getTraceback())
351 return None
354 return None
352
355
353 def stop_launchers(self, r=None):
356 def stop_launchers(self, r=None):
354 if not self._stopping:
357 if not self._stopping:
355 self._stopping = True
358 self._stopping = True
356 if isinstance(r, failure.Failure):
359 if isinstance(r, failure.Failure):
357 log.msg('Unexpected error in ipcluster:')
360 log.msg('Unexpected error in ipcluster:')
358 log.msg(r.getTraceback())
361 log.msg(r.getTraceback())
359 log.msg("IPython cluster: stopping")
362 log.msg("IPython cluster: stopping")
360 d= self.stop_engines()
363 d= self.stop_engines()
361 d2 = self.stop_controller()
364 d2 = self.stop_controller()
362 # Wait a few seconds to let things shut down.
365 # Wait a few seconds to let things shut down.
363 reactor.callLater(4.0, reactor.stop)
366 reactor.callLater(4.0, reactor.stop)
364
367
365 def sigint_handler(self, signum, frame):
368 def sigint_handler(self, signum, frame):
366 self.stop_launchers()
369 self.stop_launchers()
367
370
368 def start_logging(self):
371 def start_logging(self):
369 # Remove old log files of the controller and engine
372 # Remove old log files of the controller and engine
370 if self.master_config.Global.clean_logs:
373 if self.master_config.Global.clean_logs:
371 log_dir = self.master_config.Global.log_dir
374 log_dir = self.master_config.Global.log_dir
372 for f in os.listdir(log_dir):
375 for f in os.listdir(log_dir):
373 if f.startswith('ipengine' + '-'):
376 if f.startswith('ipengine' + '-'):
374 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
377 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
375 os.remove(os.path.join(log_dir, f))
378 os.remove(os.path.join(log_dir, f))
376 if f.startswith('ipcontroller' + '-'):
379 if f.startswith('ipcontroller' + '-'):
377 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
380 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
378 os.remove(os.path.join(log_dir, f))
381 os.remove(os.path.join(log_dir, f))
379 # This will remote old log files for ipcluster itself
382 # This will remote old log files for ipcluster itself
380 super(IPClusterApp, self).start_logging()
383 super(IPClusterApp, self).start_logging()
381
384
382 def start_app(self):
385 def start_app(self):
383 """Start the application, depending on what subcommand is used."""
386 """Start the application, depending on what subcommand is used."""
384 subcmd = self.master_config.Global.subcommand
387 subcmd = self.master_config.Global.subcommand
385 if subcmd=='create' or subcmd=='list':
388 if subcmd=='create' or subcmd=='list':
386 return
389 return
387 elif subcmd=='start':
390 elif subcmd=='start':
388 self.start_app_start()
391 self.start_app_start()
389 elif subcmd=='stop':
392 elif subcmd=='stop':
390 self.start_app_stop()
393 self.start_app_stop()
391
394
392 def start_app_start(self):
395 def start_app_start(self):
393 """Start the app for the start subcommand."""
396 """Start the app for the start subcommand."""
394 config = self.master_config
397 config = self.master_config
395 # First see if the cluster is already running
398 # First see if the cluster is already running
396 try:
399 try:
397 pid = self.get_pid_from_file()
400 pid = self.get_pid_from_file()
398 except PIDFileError:
401 except PIDFileError:
399 pass
402 pass
400 else:
403 else:
401 self.log.critical(
404 self.log.critical(
402 'Cluster is already running with [pid=%s]. '
405 'Cluster is already running with [pid=%s]. '
403 'use "ipcluster stop" to stop the cluster.' % pid
406 'use "ipcluster stop" to stop the cluster.' % pid
404 )
407 )
405 # Here I exit with a unusual exit status that other processes
408 # Here I exit with a unusual exit status that other processes
406 # can watch for to learn how I existed.
409 # can watch for to learn how I existed.
407 self.exit(ALREADY_STARTED)
410 self.exit(ALREADY_STARTED)
408
411
409 # Now log and daemonize
412 # Now log and daemonize
410 self.log.info(
413 self.log.info(
411 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
414 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
412 )
415 )
416 # TODO: Get daemonize working on Windows or as a Windows Server.
413 if config.Global.daemonize:
417 if config.Global.daemonize:
414 if os.name=='posix':
418 if os.name=='posix':
415 daemonize()
419 daemonize()
416
420
417 # Now write the new pid file AFTER our new forked pid is active.
421 # Now write the new pid file AFTER our new forked pid is active.
418 self.write_pid_file()
422 self.write_pid_file()
419 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
423 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
420 reactor.run()
424 reactor.run()
421
425
422 def start_app_stop(self):
426 def start_app_stop(self):
423 """Start the app for the stop subcommand."""
427 """Start the app for the stop subcommand."""
424 config = self.master_config
428 config = self.master_config
425 try:
429 try:
426 pid = self.get_pid_from_file()
430 pid = self.get_pid_from_file()
427 except PIDFileError:
431 except PIDFileError:
428 self.log.critical(
432 self.log.critical(
429 'Problem reading pid file, cluster is probably not running.'
433 'Problem reading pid file, cluster is probably not running.'
430 )
434 )
431 # Here I exit with a unusual exit status that other processes
435 # Here I exit with a unusual exit status that other processes
432 # can watch for to learn how I existed.
436 # can watch for to learn how I existed.
433 self.exit(ALREADY_STOPPED)
437 self.exit(ALREADY_STOPPED)
434 else:
438 else:
435 if os.name=='posix':
439 if os.name=='posix':
436 sig = config.Global.signal
440 sig = config.Global.signal
437 self.log.info(
441 self.log.info(
438 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
442 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
439 )
443 )
440 os.kill(pid, sig)
444 os.kill(pid, sig)
441 elif os.name=='nt':
445 elif os.name=='nt':
442 # As of right now, we don't support daemonize on Windows, so
446 # As of right now, we don't support daemonize on Windows, so
443 # stop will not do anything. Minimally, it should clean up the
447 # stop will not do anything. Minimally, it should clean up the
444 # old .pid files.
448 # old .pid files.
445 self.remove_pid_file()
449 self.remove_pid_file()
446
450
447 def launch_new_instance():
451 def launch_new_instance():
448 """Create and run the IPython cluster."""
452 """Create and run the IPython cluster."""
449 app = IPClusterApp()
453 app = IPClusterApp()
450 app.start()
454 app.start()
451
455
452
456
453 if __name__ == '__main__':
457 if __name__ == '__main__':
454 launch_new_instance()
458 launch_new_instance()
455
459
@@ -1,867 +1,869 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
25 from IPython.utils.platutils import find_cmd
25 from IPython.utils.platutils import find_cmd
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 from IPython.kernel.winhpcjob import (
27 from IPython.kernel.winhpcjob import (
28 WinHPCJob, WinHPCTask,
28 WinHPCJob, WinHPCTask,
29 IPControllerTask, IPEngineTask,
29 IPControllerTask, IPEngineTask,
30 IPControllerJob, IPEngineSetJob
30 IPControllerJob, IPEngineSetJob
31 )
31 )
32
32
33 from twisted.internet import reactor, defer
33 from twisted.internet import reactor, defer
34 from twisted.internet.defer import inlineCallbacks
34 from twisted.internet.defer import inlineCallbacks
35 from twisted.internet.protocol import ProcessProtocol
35 from twisted.internet.protocol import ProcessProtocol
36 from twisted.internet.utils import getProcessOutput
36 from twisted.internet.utils import getProcessOutput
37 from twisted.internet.error import ProcessDone, ProcessTerminated
37 from twisted.internet.error import ProcessDone, ProcessTerminated
38 from twisted.python import log
38 from twisted.python import log
39 from twisted.python.failure import Failure
39 from twisted.python.failure import Failure
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Utilities
42 # Utilities
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 def find_controller_cmd():
46 def find_controller_cmd():
47 """Find the command line ipcontroller program in a cross platform way."""
47 """Find the command line ipcontroller program in a cross platform way."""
48 if sys.platform == 'win32':
48 if sys.platform == 'win32':
49 # This logic is needed because the ipcontroller script doesn't
49 # This logic is needed because the ipcontroller script doesn't
50 # always get installed in the same way or in the same location.
50 # always get installed in the same way or in the same location.
51 from IPython.kernel import ipcontrollerapp
51 from IPython.kernel import ipcontrollerapp
52 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
52 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
53 # The -u option here turns on unbuffered output, which is required
53 # The -u option here turns on unbuffered output, which is required
54 # on Win32 to prevent wierd conflict and problems with Twisted.
54 # on Win32 to prevent wierd conflict and problems with Twisted.
55 # Also, use sys.executable to make sure we are picking up the
55 # Also, use sys.executable to make sure we are picking up the
56 # right python exe.
56 # right python exe.
57 cmd = [sys.executable, '-u', script_location]
57 cmd = [sys.executable, '-u', script_location]
58 else:
58 else:
59 # ipcontroller has to be on the PATH in this case.
59 # ipcontroller has to be on the PATH in this case.
60 cmd = ['ipcontroller']
60 cmd = ['ipcontroller']
61 return cmd
61 return cmd
62
62
63
63
64 def find_engine_cmd():
64 def find_engine_cmd():
65 """Find the command line ipengine program in a cross platform way."""
65 """Find the command line ipengine program in a cross platform way."""
66 if sys.platform == 'win32':
66 if sys.platform == 'win32':
67 # This logic is needed because the ipengine script doesn't
67 # This logic is needed because the ipengine script doesn't
68 # always get installed in the same way or in the same location.
68 # always get installed in the same way or in the same location.
69 from IPython.kernel import ipengineapp
69 from IPython.kernel import ipengineapp
70 script_location = ipengineapp.__file__.replace('.pyc', '.py')
70 script_location = ipengineapp.__file__.replace('.pyc', '.py')
71 # The -u option here turns on unbuffered output, which is required
71 # The -u option here turns on unbuffered output, which is required
72 # on Win32 to prevent wierd conflict and problems with Twisted.
72 # on Win32 to prevent wierd conflict and problems with Twisted.
73 # Also, use sys.executable to make sure we are picking up the
73 # Also, use sys.executable to make sure we are picking up the
74 # right python exe.
74 # right python exe.
75 cmd = [sys.executable, '-u', script_location]
75 cmd = [sys.executable, '-u', script_location]
76 else:
76 else:
77 # ipcontroller has to be on the PATH in this case.
77 # ipcontroller has to be on the PATH in this case.
78 cmd = ['ipengine']
78 cmd = ['ipengine']
79 return cmd
79 return cmd
80
80
81
81
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83 # Base launchers and errors
83 # Base launchers and errors
84 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
85
85
86
86
87 class LauncherError(Exception):
87 class LauncherError(Exception):
88 pass
88 pass
89
89
90
90
91 class ProcessStateError(LauncherError):
91 class ProcessStateError(LauncherError):
92 pass
92 pass
93
93
94
94
95 class UnknownStatus(LauncherError):
95 class UnknownStatus(LauncherError):
96 pass
96 pass
97
97
98
98
99 class BaseLauncher(Component):
99 class BaseLauncher(Component):
100 """An asbtraction for starting, stopping and signaling a process."""
100 """An asbtraction for starting, stopping and signaling a process."""
101
101
102 # In all of the launchers, the work_dir is where child processes will be
102 # In all of the launchers, the work_dir is where child processes will be
103 # run. This will usually be the cluster_dir, but may not be. any work_dir
103 # run. This will usually be the cluster_dir, but may not be. any work_dir
104 # passed into the __init__ method will override the config value.
104 # passed into the __init__ method will override the config value.
105 # This should not be used to set the work_dir for the actual engine
105 # This should not be used to set the work_dir for the actual engine
106 # and controller. Instead, use their own config files or the
106 # and controller. Instead, use their own config files or the
107 # controller_args, engine_args attributes of the launchers to add
107 # controller_args, engine_args attributes of the launchers to add
108 # the --work-dir option.
108 # the --work-dir option.
109 work_dir = Unicode(u'')
109 work_dir = Unicode(u'')
110
110
111 def __init__(self, work_dir, parent=None, name=None, config=None):
111 def __init__(self, work_dir, parent=None, name=None, config=None):
112 super(BaseLauncher, self).__init__(parent, name, config)
112 super(BaseLauncher, self).__init__(parent, name, config)
113 self.work_dir = work_dir
113 self.work_dir = work_dir
114 self.state = 'before' # can be before, running, after
114 self.state = 'before' # can be before, running, after
115 self.stop_deferreds = []
115 self.stop_deferreds = []
116 self.start_data = None
116 self.start_data = None
117 self.stop_data = None
117 self.stop_data = None
118
118
119 @property
119 @property
120 def args(self):
120 def args(self):
121 """A list of cmd and args that will be used to start the process.
121 """A list of cmd and args that will be used to start the process.
122
122
123 This is what is passed to :func:`spawnProcess` and the first element
123 This is what is passed to :func:`spawnProcess` and the first element
124 will be the process name.
124 will be the process name.
125 """
125 """
126 return self.find_args()
126 return self.find_args()
127
127
128 def find_args(self):
128 def find_args(self):
129 """The ``.args`` property calls this to find the args list.
129 """The ``.args`` property calls this to find the args list.
130
130
131 Subcommand should implement this to construct the cmd and args.
131 Subcommand should implement this to construct the cmd and args.
132 """
132 """
133 raise NotImplementedError('find_args must be implemented in a subclass')
133 raise NotImplementedError('find_args must be implemented in a subclass')
134
134
135 @property
135 @property
136 def arg_str(self):
136 def arg_str(self):
137 """The string form of the program arguments."""
137 """The string form of the program arguments."""
138 return ' '.join(self.args)
138 return ' '.join(self.args)
139
139
140 @property
140 @property
141 def running(self):
141 def running(self):
142 """Am I running."""
142 """Am I running."""
143 if self.state == 'running':
143 if self.state == 'running':
144 return True
144 return True
145 else:
145 else:
146 return False
146 return False
147
147
148 def start(self):
148 def start(self):
149 """Start the process.
149 """Start the process.
150
150
151 This must return a deferred that fires with information about the
151 This must return a deferred that fires with information about the
152 process starting (like a pid, job id, etc.).
152 process starting (like a pid, job id, etc.).
153 """
153 """
154 return defer.fail(
154 return defer.fail(
155 Failure(NotImplementedError(
155 Failure(NotImplementedError(
156 'start must be implemented in a subclass')
156 'start must be implemented in a subclass')
157 )
157 )
158 )
158 )
159
159
160 def stop(self):
160 def stop(self):
161 """Stop the process and notify observers of stopping.
161 """Stop the process and notify observers of stopping.
162
162
163 This must return a deferred that fires with information about the
163 This must return a deferred that fires with information about the
164 processing stopping, like errors that occur while the process is
164 processing stopping, like errors that occur while the process is
165 attempting to be shut down. This deferred won't fire when the process
165 attempting to be shut down. This deferred won't fire when the process
166 actually stops. To observe the actual process stopping, see
166 actually stops. To observe the actual process stopping, see
167 :func:`observe_stop`.
167 :func:`observe_stop`.
168 """
168 """
169 return defer.fail(
169 return defer.fail(
170 Failure(NotImplementedError(
170 Failure(NotImplementedError(
171 'stop must be implemented in a subclass')
171 'stop must be implemented in a subclass')
172 )
172 )
173 )
173 )
174
174
175 def observe_stop(self):
175 def observe_stop(self):
176 """Get a deferred that will fire when the process stops.
176 """Get a deferred that will fire when the process stops.
177
177
178 The deferred will fire with data that contains information about
178 The deferred will fire with data that contains information about
179 the exit status of the process.
179 the exit status of the process.
180 """
180 """
181 if self.state=='after':
181 if self.state=='after':
182 return defer.succeed(self.stop_data)
182 return defer.succeed(self.stop_data)
183 else:
183 else:
184 d = defer.Deferred()
184 d = defer.Deferred()
185 self.stop_deferreds.append(d)
185 self.stop_deferreds.append(d)
186 return d
186 return d
187
187
188 def notify_start(self, data):
188 def notify_start(self, data):
189 """Call this to trigger startup actions.
189 """Call this to trigger startup actions.
190
190
191 This logs the process startup and sets the state to 'running'. It is
191 This logs the process startup and sets the state to 'running'. It is
192 a pass-through so it can be used as a callback.
192 a pass-through so it can be used as a callback.
193 """
193 """
194
194
195 log.msg('Process %r started: %r' % (self.args[0], data))
195 log.msg('Process %r started: %r' % (self.args[0], data))
196 self.start_data = data
196 self.start_data = data
197 self.state = 'running'
197 self.state = 'running'
198 return data
198 return data
199
199
200 def notify_stop(self, data):
200 def notify_stop(self, data):
201 """Call this to trigger process stop actions.
201 """Call this to trigger process stop actions.
202
202
203 This logs the process stopping and sets the state to 'after'. Call
203 This logs the process stopping and sets the state to 'after'. Call
204 this to trigger all the deferreds from :func:`observe_stop`."""
204 this to trigger all the deferreds from :func:`observe_stop`."""
205
205
206 log.msg('Process %r stopped: %r' % (self.args[0], data))
206 log.msg('Process %r stopped: %r' % (self.args[0], data))
207 self.stop_data = data
207 self.stop_data = data
208 self.state = 'after'
208 self.state = 'after'
209 for i in range(len(self.stop_deferreds)):
209 for i in range(len(self.stop_deferreds)):
210 d = self.stop_deferreds.pop()
210 d = self.stop_deferreds.pop()
211 d.callback(data)
211 d.callback(data)
212 return data
212 return data
213
213
214 def signal(self, sig):
214 def signal(self, sig):
215 """Signal the process.
215 """Signal the process.
216
216
217 Return a semi-meaningless deferred after signaling the process.
217 Return a semi-meaningless deferred after signaling the process.
218
218
219 Parameters
219 Parameters
220 ----------
220 ----------
221 sig : str or int
221 sig : str or int
222 'KILL', 'INT', etc., or any signal number
222 'KILL', 'INT', etc., or any signal number
223 """
223 """
224 return defer.fail(
224 return defer.fail(
225 Failure(NotImplementedError(
225 Failure(NotImplementedError(
226 'signal must be implemented in a subclass')
226 'signal must be implemented in a subclass')
227 )
227 )
228 )
228 )
229
229
230
230
231 #-----------------------------------------------------------------------------
231 #-----------------------------------------------------------------------------
232 # Local process launchers
232 # Local process launchers
233 #-----------------------------------------------------------------------------
233 #-----------------------------------------------------------------------------
234
234
235
235
236 class LocalProcessLauncherProtocol(ProcessProtocol):
236 class LocalProcessLauncherProtocol(ProcessProtocol):
237 """A ProcessProtocol to go with the LocalProcessLauncher."""
237 """A ProcessProtocol to go with the LocalProcessLauncher."""
238
238
239 def __init__(self, process_launcher):
239 def __init__(self, process_launcher):
240 self.process_launcher = process_launcher
240 self.process_launcher = process_launcher
241 self.pid = None
241 self.pid = None
242
242
243 def connectionMade(self):
243 def connectionMade(self):
244 self.pid = self.transport.pid
244 self.pid = self.transport.pid
245 self.process_launcher.notify_start(self.transport.pid)
245 self.process_launcher.notify_start(self.transport.pid)
246
246
247 def processEnded(self, status):
247 def processEnded(self, status):
248 value = status.value
248 value = status.value
249 if isinstance(value, ProcessDone):
249 if isinstance(value, ProcessDone):
250 self.process_launcher.notify_stop(
250 self.process_launcher.notify_stop(
251 {'exit_code':0,
251 {'exit_code':0,
252 'signal':None,
252 'signal':None,
253 'status':None,
253 'status':None,
254 'pid':self.pid
254 'pid':self.pid
255 }
255 }
256 )
256 )
257 elif isinstance(value, ProcessTerminated):
257 elif isinstance(value, ProcessTerminated):
258 self.process_launcher.notify_stop(
258 self.process_launcher.notify_stop(
259 {'exit_code':value.exitCode,
259 {'exit_code':value.exitCode,
260 'signal':value.signal,
260 'signal':value.signal,
261 'status':value.status,
261 'status':value.status,
262 'pid':self.pid
262 'pid':self.pid
263 }
263 }
264 )
264 )
265 else:
265 else:
266 raise UnknownStatus("Unknown exit status, this is probably a "
266 raise UnknownStatus("Unknown exit status, this is probably a "
267 "bug in Twisted")
267 "bug in Twisted")
268
268
269 def outReceived(self, data):
269 def outReceived(self, data):
270 log.msg(data)
270 log.msg(data)
271
271
272 def errReceived(self, data):
272 def errReceived(self, data):
273 log.err(data)
273 log.err(data)
274
274
275
275
276 class LocalProcessLauncher(BaseLauncher):
276 class LocalProcessLauncher(BaseLauncher):
277 """Start and stop an external process in an asynchronous manner.
277 """Start and stop an external process in an asynchronous manner.
278
278
279 This will launch the external process with a working directory of
279 This will launch the external process with a working directory of
280 ``self.work_dir``.
280 ``self.work_dir``.
281 """
281 """
282
282
283 # This is used to to construct self.args, which is passed to
283 # This is used to to construct self.args, which is passed to
284 # spawnProcess.
284 # spawnProcess.
285 cmd_and_args = List([])
285 cmd_and_args = List([])
286
286
287 def __init__(self, work_dir, parent=None, name=None, config=None):
287 def __init__(self, work_dir, parent=None, name=None, config=None):
288 super(LocalProcessLauncher, self).__init__(
288 super(LocalProcessLauncher, self).__init__(
289 work_dir, parent, name, config
289 work_dir, parent, name, config
290 )
290 )
291 self.process_protocol = None
291 self.process_protocol = None
292 self.start_deferred = None
292 self.start_deferred = None
293
293
294 def find_args(self):
294 def find_args(self):
295 return self.cmd_and_args
295 return self.cmd_and_args
296
296
297 def start(self):
297 def start(self):
298 if self.state == 'before':
298 if self.state == 'before':
299 self.process_protocol = LocalProcessLauncherProtocol(self)
299 self.process_protocol = LocalProcessLauncherProtocol(self)
300 self.start_deferred = defer.Deferred()
300 self.start_deferred = defer.Deferred()
301 self.process_transport = reactor.spawnProcess(
301 self.process_transport = reactor.spawnProcess(
302 self.process_protocol,
302 self.process_protocol,
303 str(self.args[0]), # twisted expects these to be str, not unicode
303 str(self.args[0]), # twisted expects these to be str, not unicode
304 [str(a) for a in self.args], # str expected, not unicode
304 [str(a) for a in self.args], # str expected, not unicode
305 env=os.environ,
305 env=os.environ,
306 path=self.work_dir # start in the work_dir
306 path=self.work_dir # start in the work_dir
307 )
307 )
308 return self.start_deferred
308 return self.start_deferred
309 else:
309 else:
310 s = 'The process was already started and has state: %r' % self.state
310 s = 'The process was already started and has state: %r' % self.state
311 return defer.fail(ProcessStateError(s))
311 return defer.fail(ProcessStateError(s))
312
312
313 def notify_start(self, data):
313 def notify_start(self, data):
314 super(LocalProcessLauncher, self).notify_start(data)
314 super(LocalProcessLauncher, self).notify_start(data)
315 self.start_deferred.callback(data)
315 self.start_deferred.callback(data)
316
316
317 def stop(self):
317 def stop(self):
318 return self.interrupt_then_kill()
318 return self.interrupt_then_kill()
319
319
320 @make_deferred
320 @make_deferred
321 def signal(self, sig):
321 def signal(self, sig):
322 if self.state == 'running':
322 if self.state == 'running':
323 self.process_transport.signalProcess(sig)
323 self.process_transport.signalProcess(sig)
324
324
325 @inlineCallbacks
325 @inlineCallbacks
326 def interrupt_then_kill(self, delay=2.0):
326 def interrupt_then_kill(self, delay=2.0):
327 """Send INT, wait a delay and then send KILL."""
327 """Send INT, wait a delay and then send KILL."""
328 yield self.signal('INT')
328 yield self.signal('INT')
329 yield sleep_deferred(delay)
329 yield sleep_deferred(delay)
330 yield self.signal('KILL')
330 yield self.signal('KILL')
331
331
332
332
333 class LocalControllerLauncher(LocalProcessLauncher):
333 class LocalControllerLauncher(LocalProcessLauncher):
334 """Launch a controller as a regular external process."""
334 """Launch a controller as a regular external process."""
335
335
336 controller_cmd = List(find_controller_cmd(), config=True)
336 controller_cmd = List(find_controller_cmd(), config=True)
337 # Command line arguments to ipcontroller.
337 # Command line arguments to ipcontroller.
338 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
338 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
339
339
340 def find_args(self):
340 def find_args(self):
341 return self.controller_cmd + self.controller_args
341 return self.controller_cmd + self.controller_args
342
342
343 def start(self, cluster_dir):
343 def start(self, cluster_dir):
344 """Start the controller by cluster_dir."""
344 """Start the controller by cluster_dir."""
345 self.controller_args.extend(['--cluster-dir', cluster_dir])
345 self.controller_args.extend(['--cluster-dir', cluster_dir])
346 self.cluster_dir = unicode(cluster_dir)
346 self.cluster_dir = unicode(cluster_dir)
347 log.msg("Starting LocalControllerLauncher: %r" % self.args)
347 log.msg("Starting LocalControllerLauncher: %r" % self.args)
348 return super(LocalControllerLauncher, self).start()
348 return super(LocalControllerLauncher, self).start()
349
349
350
350
351 class LocalEngineLauncher(LocalProcessLauncher):
351 class LocalEngineLauncher(LocalProcessLauncher):
352 """Launch a single engine as a regular externall process."""
352 """Launch a single engine as a regular externall process."""
353
353
354 engine_cmd = List(find_engine_cmd(), config=True)
354 engine_cmd = List(find_engine_cmd(), config=True)
355 # Command line arguments for ipengine.
355 # Command line arguments for ipengine.
356 engine_args = List(
356 engine_args = List(
357 ['--log-to-file','--log-level', '40'], config=True
357 ['--log-to-file','--log-level', '40'], config=True
358 )
358 )
359
359
360 def find_args(self):
360 def find_args(self):
361 return self.engine_cmd + self.engine_args
361 return self.engine_cmd + self.engine_args
362
362
363 def start(self, cluster_dir):
363 def start(self, cluster_dir):
364 """Start the engine by cluster_dir."""
364 """Start the engine by cluster_dir."""
365 self.engine_args.extend(['--cluster-dir', cluster_dir])
365 self.engine_args.extend(['--cluster-dir', cluster_dir])
366 self.cluster_dir = unicode(cluster_dir)
366 self.cluster_dir = unicode(cluster_dir)
367 return super(LocalEngineLauncher, self).start()
367 return super(LocalEngineLauncher, self).start()
368
368
369
369
370 class LocalEngineSetLauncher(BaseLauncher):
370 class LocalEngineSetLauncher(BaseLauncher):
371 """Launch a set of engines as regular external processes."""
371 """Launch a set of engines as regular external processes."""
372
372
373 # Command line arguments for ipengine.
373 # Command line arguments for ipengine.
374 engine_args = List(
374 engine_args = List(
375 ['--log-to-file','--log-level', '40'], config=True
375 ['--log-to-file','--log-level', '40'], config=True
376 )
376 )
377
377
378 def __init__(self, work_dir, parent=None, name=None, config=None):
378 def __init__(self, work_dir, parent=None, name=None, config=None):
379 super(LocalEngineSetLauncher, self).__init__(
379 super(LocalEngineSetLauncher, self).__init__(
380 work_dir, parent, name, config
380 work_dir, parent, name, config
381 )
381 )
382 self.launchers = []
382 self.launchers = []
383
383
384 def start(self, n, cluster_dir):
384 def start(self, n, cluster_dir):
385 """Start n engines by profile or cluster_dir."""
385 """Start n engines by profile or cluster_dir."""
386 self.cluster_dir = unicode(cluster_dir)
386 self.cluster_dir = unicode(cluster_dir)
387 dlist = []
387 dlist = []
388 for i in range(n):
388 for i in range(n):
389 el = LocalEngineLauncher(self.work_dir, self)
389 el = LocalEngineLauncher(self.work_dir, self)
390 # Copy the engine args over to each engine launcher.
390 # Copy the engine args over to each engine launcher.
391 import copy
391 import copy
392 el.engine_args = copy.deepcopy(self.engine_args)
392 el.engine_args = copy.deepcopy(self.engine_args)
393 d = el.start(cluster_dir)
393 d = el.start(cluster_dir)
394 if i==0:
394 if i==0:
395 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
395 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
396 self.launchers.append(el)
396 self.launchers.append(el)
397 dlist.append(d)
397 dlist.append(d)
398 # The consumeErrors here could be dangerous
398 # The consumeErrors here could be dangerous
399 dfinal = gatherBoth(dlist, consumeErrors=True)
399 dfinal = gatherBoth(dlist, consumeErrors=True)
400 dfinal.addCallback(self.notify_start)
400 dfinal.addCallback(self.notify_start)
401 return dfinal
401 return dfinal
402
402
403 def find_args(self):
403 def find_args(self):
404 return ['engine set']
404 return ['engine set']
405
405
406 def signal(self, sig):
406 def signal(self, sig):
407 dlist = []
407 dlist = []
408 for el in self.launchers:
408 for el in self.launchers:
409 d = el.signal(sig)
409 d = el.signal(sig)
410 dlist.append(d)
410 dlist.append(d)
411 dfinal = gatherBoth(dlist, consumeErrors=True)
411 dfinal = gatherBoth(dlist, consumeErrors=True)
412 return dfinal
412 return dfinal
413
413
414 def interrupt_then_kill(self, delay=1.0):
414 def interrupt_then_kill(self, delay=1.0):
415 dlist = []
415 dlist = []
416 for el in self.launchers:
416 for el in self.launchers:
417 d = el.interrupt_then_kill(delay)
417 d = el.interrupt_then_kill(delay)
418 dlist.append(d)
418 dlist.append(d)
419 dfinal = gatherBoth(dlist, consumeErrors=True)
419 dfinal = gatherBoth(dlist, consumeErrors=True)
420 return dfinal
420 return dfinal
421
421
422 def stop(self):
422 def stop(self):
423 return self.interrupt_then_kill()
423 return self.interrupt_then_kill()
424
424
425 def observe_stop(self):
425 def observe_stop(self):
426 dlist = [el.observe_stop() for el in self.launchers]
426 dlist = [el.observe_stop() for el in self.launchers]
427 dfinal = gatherBoth(dlist, consumeErrors=False)
427 dfinal = gatherBoth(dlist, consumeErrors=False)
428 dfinal.addCallback(self.notify_stop)
428 dfinal.addCallback(self.notify_stop)
429 return dfinal
429 return dfinal
430
430
431
431
432 #-----------------------------------------------------------------------------
432 #-----------------------------------------------------------------------------
433 # MPIExec launchers
433 # MPIExec launchers
434 #-----------------------------------------------------------------------------
434 #-----------------------------------------------------------------------------
435
435
436
436
437 class MPIExecLauncher(LocalProcessLauncher):
437 class MPIExecLauncher(LocalProcessLauncher):
438 """Launch an external process using mpiexec."""
438 """Launch an external process using mpiexec."""
439
439
440 # The mpiexec command to use in starting the process.
440 # The mpiexec command to use in starting the process.
441 mpi_cmd = List(['mpiexec'], config=True)
441 mpi_cmd = List(['mpiexec'], config=True)
442 # The command line arguments to pass to mpiexec.
442 # The command line arguments to pass to mpiexec.
443 mpi_args = List([], config=True)
443 mpi_args = List([], config=True)
444 # The program to start using mpiexec.
444 # The program to start using mpiexec.
445 program = List(['date'], config=True)
445 program = List(['date'], config=True)
446 # The command line argument to the program.
446 # The command line argument to the program.
447 program_args = List([], config=True)
447 program_args = List([], config=True)
448 # The number of instances of the program to start.
448 # The number of instances of the program to start.
449 n = Int(1, config=True)
449 n = Int(1, config=True)
450
450
451 def find_args(self):
451 def find_args(self):
452 """Build self.args using all the fields."""
452 """Build self.args using all the fields."""
453 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
453 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
454 self.program + self.program_args
454 self.program + self.program_args
455
455
456 def start(self, n):
456 def start(self, n):
457 """Start n instances of the program using mpiexec."""
457 """Start n instances of the program using mpiexec."""
458 self.n = n
458 self.n = n
459 return super(MPIExecLauncher, self).start()
459 return super(MPIExecLauncher, self).start()
460
460
461
461
462 class MPIExecControllerLauncher(MPIExecLauncher):
462 class MPIExecControllerLauncher(MPIExecLauncher):
463 """Launch a controller using mpiexec."""
463 """Launch a controller using mpiexec."""
464
464
465 controller_cmd = List(find_controller_cmd(), config=True)
465 controller_cmd = List(find_controller_cmd(), config=True)
466 # Command line arguments to ipcontroller.
466 # Command line arguments to ipcontroller.
467 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
467 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
468 n = Int(1, config=False)
468 n = Int(1, config=False)
469
469
470 def start(self, cluster_dir):
470 def start(self, cluster_dir):
471 """Start the controller by cluster_dir."""
471 """Start the controller by cluster_dir."""
472 self.controller_args.extend(['--cluster-dir', cluster_dir])
472 self.controller_args.extend(['--cluster-dir', cluster_dir])
473 self.cluster_dir = unicode(cluster_dir)
473 self.cluster_dir = unicode(cluster_dir)
474 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
474 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
475 return super(MPIExecControllerLauncher, self).start(1)
475 return super(MPIExecControllerLauncher, self).start(1)
476
476
477 def find_args(self):
477 def find_args(self):
478 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
478 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
479 self.controller_cmd + self.controller_args
479 self.controller_cmd + self.controller_args
480
480
481
481
482 class MPIExecEngineSetLauncher(MPIExecLauncher):
482 class MPIExecEngineSetLauncher(MPIExecLauncher):
483
483
484 engine_cmd = List(find_engine_cmd(), config=True)
484 engine_cmd = List(find_engine_cmd(), config=True)
485 # Command line arguments for ipengine.
485 # Command line arguments for ipengine.
486 engine_args = List(
486 engine_args = List(
487 ['--log-to-file','--log-level', '40'], config=True
487 ['--log-to-file','--log-level', '40'], config=True
488 )
488 )
489 n = Int(1, config=True)
489 n = Int(1, config=True)
490
490
491 def start(self, n, cluster_dir):
491 def start(self, n, cluster_dir):
492 """Start n engines by profile or cluster_dir."""
492 """Start n engines by profile or cluster_dir."""
493 self.engine_args.extend(['--cluster-dir', cluster_dir])
493 self.engine_args.extend(['--cluster-dir', cluster_dir])
494 self.cluster_dir = unicode(cluster_dir)
494 self.cluster_dir = unicode(cluster_dir)
495 self.n = n
495 self.n = n
496 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
496 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
497 return super(MPIExecEngineSetLauncher, self).start(n)
497 return super(MPIExecEngineSetLauncher, self).start(n)
498
498
499 def find_args(self):
499 def find_args(self):
500 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
500 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
501 self.engine_cmd + self.engine_args
501 self.engine_cmd + self.engine_args
502
502
503
503
504 #-----------------------------------------------------------------------------
504 #-----------------------------------------------------------------------------
505 # SSH launchers
505 # SSH launchers
506 #-----------------------------------------------------------------------------
506 #-----------------------------------------------------------------------------
507
507
508 # TODO: Get SSH Launcher working again.
508
509
509 class SSHLauncher(BaseLauncher):
510 class SSHLauncher(BaseLauncher):
510 """A minimal launcher for ssh.
511 """A minimal launcher for ssh.
511
512
512 To be useful this will probably have to be extended to use the ``sshx``
513 To be useful this will probably have to be extended to use the ``sshx``
513 idea for environment variables. There could be other things this needs
514 idea for environment variables. There could be other things this needs
514 as well.
515 as well.
515 """
516 """
516
517
517 ssh_cmd = List(['ssh'], config=True)
518 ssh_cmd = List(['ssh'], config=True)
518 ssh_args = List([], config=True)
519 ssh_args = List([], config=True)
519 program = List(['date'], config=True)
520 program = List(['date'], config=True)
520 program_args = List([], config=True)
521 program_args = List([], config=True)
521 hostname = Str('', config=True)
522 hostname = Str('', config=True)
522 user = Str('', config=True)
523 user = Str('', config=True)
523 location = Str('')
524 location = Str('')
524
525
525 def _hostname_changed(self, name, old, new):
526 def _hostname_changed(self, name, old, new):
526 self.location = '%s@%s' % (self.user, new)
527 self.location = '%s@%s' % (self.user, new)
527
528
528 def _user_changed(self, name, old, new):
529 def _user_changed(self, name, old, new):
529 self.location = '%s@%s' % (new, self.hostname)
530 self.location = '%s@%s' % (new, self.hostname)
530
531
531 def find_args(self):
532 def find_args(self):
532 return self.ssh_cmd + self.ssh_args + [self.location] + \
533 return self.ssh_cmd + self.ssh_args + [self.location] + \
533 self.program + self.program_args
534 self.program + self.program_args
534
535
535 def start(self, n, hostname=None, user=None):
536 def start(self, n, hostname=None, user=None):
536 if hostname is not None:
537 if hostname is not None:
537 self.hostname = hostname
538 self.hostname = hostname
538 if user is not None:
539 if user is not None:
539 self.user = user
540 self.user = user
540 return super(SSHLauncher, self).start()
541 return super(SSHLauncher, self).start()
541
542
542
543
543 class SSHControllerLauncher(SSHLauncher):
544 class SSHControllerLauncher(SSHLauncher):
544 pass
545 pass
545
546
546
547
547 class SSHEngineSetLauncher(BaseLauncher):
548 class SSHEngineSetLauncher(BaseLauncher):
548 pass
549 pass
549
550
550
551
551 #-----------------------------------------------------------------------------
552 #-----------------------------------------------------------------------------
552 # Windows HPC Server 2008 scheduler launchers
553 # Windows HPC Server 2008 scheduler launchers
553 #-----------------------------------------------------------------------------
554 #-----------------------------------------------------------------------------
554
555
555
556
556 # This is only used on Windows.
557 # This is only used on Windows.
557 def find_job_cmd():
558 def find_job_cmd():
558 if os.name=='nt':
559 if os.name=='nt':
559 return find_cmd('job')
560 return find_cmd('job')
560 else:
561 else:
561 return 'job'
562 return 'job'
562
563
563
564
564 class WindowsHPCLauncher(BaseLauncher):
565 class WindowsHPCLauncher(BaseLauncher):
565
566
566 # A regular expression used to get the job id from the output of the
567 # A regular expression used to get the job id from the output of the
567 # submit_command.
568 # submit_command.
568 job_id_regexp = Str('\d+', config=True)
569 job_id_regexp = Str(r'\d+', config=True)
569 # The filename of the instantiated job script.
570 # The filename of the instantiated job script.
570 job_file_name = Unicode(u'ipython_job.xml', config=True)
571 job_file_name = Unicode(u'ipython_job.xml', config=True)
571 # The full path to the instantiated job script. This gets made dynamically
572 # The full path to the instantiated job script. This gets made dynamically
572 # by combining the work_dir with the job_file_name.
573 # by combining the work_dir with the job_file_name.
573 job_file = Unicode(u'')
574 job_file = Unicode(u'')
574 # The hostname of the scheduler to submit the job to
575 # The hostname of the scheduler to submit the job to
575 scheduler = Str('', config=True)
576 scheduler = Str('', config=True)
576 job_cmd = Str(find_job_cmd(), config=True)
577 job_cmd = Str(find_job_cmd(), config=True)
577
578
578 def __init__(self, work_dir, parent=None, name=None, config=None):
579 def __init__(self, work_dir, parent=None, name=None, config=None):
579 super(WindowsHPCLauncher, self).__init__(
580 super(WindowsHPCLauncher, self).__init__(
580 work_dir, parent, name, config
581 work_dir, parent, name, config
581 )
582 )
582
583
583 @property
584 @property
584 def job_file(self):
585 def job_file(self):
585 return os.path.join(self.work_dir, self.job_file_name)
586 return os.path.join(self.work_dir, self.job_file_name)
586
587
587 def write_job_file(self, n):
588 def write_job_file(self, n):
588 raise NotImplementedError("Implement write_job_file in a subclass.")
589 raise NotImplementedError("Implement write_job_file in a subclass.")
589
590
590 def find_args(self):
591 def find_args(self):
591 return ['job.exe']
592 return ['job.exe']
592
593
593 def parse_job_id(self, output):
594 def parse_job_id(self, output):
594 """Take the output of the submit command and return the job id."""
595 """Take the output of the submit command and return the job id."""
595 m = re.search(self.job_id_regexp, output)
596 m = re.search(self.job_id_regexp, output)
596 if m is not None:
597 if m is not None:
597 job_id = m.group()
598 job_id = m.group()
598 else:
599 else:
599 raise LauncherError("Job id couldn't be determined: %s" % output)
600 raise LauncherError("Job id couldn't be determined: %s" % output)
600 self.job_id = job_id
601 self.job_id = job_id
601 log.msg('Job started with job id: %r' % job_id)
602 log.msg('Job started with job id: %r' % job_id)
602 return job_id
603 return job_id
603
604
604 @inlineCallbacks
605 @inlineCallbacks
605 def start(self, n):
606 def start(self, n):
606 """Start n copies of the process using the Win HPC job scheduler."""
607 """Start n copies of the process using the Win HPC job scheduler."""
607 self.write_job_file(n)
608 self.write_job_file(n)
608 args = [
609 args = [
609 'submit',
610 'submit',
610 '/jobfile:%s' % self.job_file,
611 '/jobfile:%s' % self.job_file,
611 '/scheduler:%s' % self.scheduler
612 '/scheduler:%s' % self.scheduler
612 ]
613 ]
613 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
614 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
614 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
615 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
615 output = yield getProcessOutput(str(self.job_cmd),
616 output = yield getProcessOutput(str(self.job_cmd),
616 [str(a) for a in args],
617 [str(a) for a in args],
617 env=dict((str(k),str(v)) for k,v in os.environ.items()),
618 env=dict((str(k),str(v)) for k,v in os.environ.items()),
618 path=self.work_dir
619 path=self.work_dir
619 )
620 )
620 job_id = self.parse_job_id(output)
621 job_id = self.parse_job_id(output)
621 self.notify_start(job_id)
622 self.notify_start(job_id)
622 defer.returnValue(job_id)
623 defer.returnValue(job_id)
623
624
624 @inlineCallbacks
625 @inlineCallbacks
625 def stop(self):
626 def stop(self):
626 args = [
627 args = [
627 'cancel',
628 'cancel',
628 self.job_id,
629 self.job_id,
629 '/scheduler:%s' % self.scheduler
630 '/scheduler:%s' % self.scheduler
630 ]
631 ]
631 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
632 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
632 try:
633 try:
633 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
634 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
634 output = yield getProcessOutput(str(self.job_cmd),
635 output = yield getProcessOutput(str(self.job_cmd),
635 [str(a) for a in args],
636 [str(a) for a in args],
636 env=dict((str(k),str(v)) for k,v in os.environ.items()),
637 env=dict((str(k),str(v)) for k,v in os.environ.items()),
637 path=self.work_dir
638 path=self.work_dir
638 )
639 )
639 except:
640 except:
640 output = 'The job already appears to be stoppped: %r' % self.job_id
641 output = 'The job already appears to be stoppped: %r' % self.job_id
641 self.notify_stop(output) # Pass the output of the kill cmd
642 self.notify_stop(output) # Pass the output of the kill cmd
642 defer.returnValue(output)
643 defer.returnValue(output)
643
644
644
645
645 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
646 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
646
647
647 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
648 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
648 extra_args = List([], config=False)
649 extra_args = List([], config=False)
649
650
650 def write_job_file(self, n):
651 def write_job_file(self, n):
651 job = IPControllerJob(self)
652 job = IPControllerJob(self)
652
653
653 t = IPControllerTask(self)
654 t = IPControllerTask(self)
654 # The tasks work directory is *not* the actual work directory of
655 # The tasks work directory is *not* the actual work directory of
655 # the controller. It is used as the base path for the stdout/stderr
656 # the controller. It is used as the base path for the stdout/stderr
656 # files that the scheduler redirects to.
657 # files that the scheduler redirects to.
657 t.work_directory = self.cluster_dir
658 t.work_directory = self.cluster_dir
658 # Add the --cluster-dir and from self.start().
659 # Add the --cluster-dir and from self.start().
659 t.controller_args.extend(self.extra_args)
660 t.controller_args.extend(self.extra_args)
660 job.add_task(t)
661 job.add_task(t)
661
662
662 log.msg("Writing job description file: %s" % self.job_file)
663 log.msg("Writing job description file: %s" % self.job_file)
663 job.write(self.job_file)
664 job.write(self.job_file)
664
665
665 @property
666 @property
666 def job_file(self):
667 def job_file(self):
667 return os.path.join(self.cluster_dir, self.job_file_name)
668 return os.path.join(self.cluster_dir, self.job_file_name)
668
669
669 def start(self, cluster_dir):
670 def start(self, cluster_dir):
670 """Start the controller by cluster_dir."""
671 """Start the controller by cluster_dir."""
671 self.extra_args = ['--cluster-dir', cluster_dir]
672 self.extra_args = ['--cluster-dir', cluster_dir]
672 self.cluster_dir = unicode(cluster_dir)
673 self.cluster_dir = unicode(cluster_dir)
673 return super(WindowsHPCControllerLauncher, self).start(1)
674 return super(WindowsHPCControllerLauncher, self).start(1)
674
675
675
676
676 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
677 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
677
678
678 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
679 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
679 extra_args = List([], config=False)
680 extra_args = List([], config=False)
680
681
681 def write_job_file(self, n):
682 def write_job_file(self, n):
682 job = IPEngineSetJob(self)
683 job = IPEngineSetJob(self)
683
684
684 for i in range(n):
685 for i in range(n):
685 t = IPEngineTask(self)
686 t = IPEngineTask(self)
686 # The tasks work directory is *not* the actual work directory of
687 # The tasks work directory is *not* the actual work directory of
687 # the engine. It is used as the base path for the stdout/stderr
688 # the engine. It is used as the base path for the stdout/stderr
688 # files that the scheduler redirects to.
689 # files that the scheduler redirects to.
689 t.work_directory = self.cluster_dir
690 t.work_directory = self.cluster_dir
690 # Add the --cluster-dir and from self.start().
691 # Add the --cluster-dir and from self.start().
691 t.engine_args.extend(self.extra_args)
692 t.engine_args.extend(self.extra_args)
692 job.add_task(t)
693 job.add_task(t)
693
694
694 log.msg("Writing job description file: %s" % self.job_file)
695 log.msg("Writing job description file: %s" % self.job_file)
695 job.write(self.job_file)
696 job.write(self.job_file)
696
697
697 @property
698 @property
698 def job_file(self):
699 def job_file(self):
699 return os.path.join(self.cluster_dir, self.job_file_name)
700 return os.path.join(self.cluster_dir, self.job_file_name)
700
701
701 def start(self, n, cluster_dir):
702 def start(self, n, cluster_dir):
702 """Start the controller by cluster_dir."""
703 """Start the controller by cluster_dir."""
703 self.extra_args = ['--cluster-dir', cluster_dir]
704 self.extra_args = ['--cluster-dir', cluster_dir]
704 self.cluster_dir = unicode(cluster_dir)
705 self.cluster_dir = unicode(cluster_dir)
705 return super(WindowsHPCEngineSetLauncher, self).start(n)
706 return super(WindowsHPCEngineSetLauncher, self).start(n)
706
707
707
708
708 #-----------------------------------------------------------------------------
709 #-----------------------------------------------------------------------------
709 # Batch (PBS) system launchers
710 # Batch (PBS) system launchers
710 #-----------------------------------------------------------------------------
711 #-----------------------------------------------------------------------------
711
712
713 # TODO: Get PBS launcher working again.
712
714
713 class BatchSystemLauncher(BaseLauncher):
715 class BatchSystemLauncher(BaseLauncher):
714 """Launch an external process using a batch system.
716 """Launch an external process using a batch system.
715
717
716 This class is designed to work with UNIX batch systems like PBS, LSF,
718 This class is designed to work with UNIX batch systems like PBS, LSF,
717 GridEngine, etc. The overall model is that there are different commands
719 GridEngine, etc. The overall model is that there are different commands
718 like qsub, qdel, etc. that handle the starting and stopping of the process.
720 like qsub, qdel, etc. that handle the starting and stopping of the process.
719
721
720 This class also has the notion of a batch script. The ``batch_template``
722 This class also has the notion of a batch script. The ``batch_template``
721 attribute can be set to a string that is a template for the batch script.
723 attribute can be set to a string that is a template for the batch script.
722 This template is instantiated using Itpl. Thus the template can use
724 This template is instantiated using Itpl. Thus the template can use
723 ${n} fot the number of instances. Subclasses can add additional variables
725 ${n} fot the number of instances. Subclasses can add additional variables
724 to the template dict.
726 to the template dict.
725 """
727 """
726
728
727 # Subclasses must fill these in. See PBSEngineSet
729 # Subclasses must fill these in. See PBSEngineSet
728 # The name of the command line program used to submit jobs.
730 # The name of the command line program used to submit jobs.
729 submit_command = Str('', config=True)
731 submit_command = Str('', config=True)
730 # The name of the command line program used to delete jobs.
732 # The name of the command line program used to delete jobs.
731 delete_command = Str('', config=True)
733 delete_command = Str('', config=True)
732 # A regular expression used to get the job id from the output of the
734 # A regular expression used to get the job id from the output of the
733 # submit_command.
735 # submit_command.
734 job_id_regexp = Str('', config=True)
736 job_id_regexp = Str('', config=True)
735 # The string that is the batch script template itself.
737 # The string that is the batch script template itself.
736 batch_template = Str('', config=True)
738 batch_template = Str('', config=True)
737 # The filename of the instantiated batch script.
739 # The filename of the instantiated batch script.
738 batch_file_name = Unicode(u'batch_script', config=True)
740 batch_file_name = Unicode(u'batch_script', config=True)
739 # The full path to the instantiated batch script.
741 # The full path to the instantiated batch script.
740 batch_file = Unicode(u'')
742 batch_file = Unicode(u'')
741
743
742 def __init__(self, work_dir, parent=None, name=None, config=None):
744 def __init__(self, work_dir, parent=None, name=None, config=None):
743 super(BatchSystemLauncher, self).__init__(
745 super(BatchSystemLauncher, self).__init__(
744 work_dir, parent, name, config
746 work_dir, parent, name, config
745 )
747 )
746 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
748 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
747 self.context = {}
749 self.context = {}
748
750
749 def parse_job_id(self, output):
751 def parse_job_id(self, output):
750 """Take the output of the submit command and return the job id."""
752 """Take the output of the submit command and return the job id."""
751 m = re.match(self.job_id_regexp, output)
753 m = re.match(self.job_id_regexp, output)
752 if m is not None:
754 if m is not None:
753 job_id = m.group()
755 job_id = m.group()
754 else:
756 else:
755 raise LauncherError("Job id couldn't be determined: %s" % output)
757 raise LauncherError("Job id couldn't be determined: %s" % output)
756 self.job_id = job_id
758 self.job_id = job_id
757 log.msg('Job started with job id: %r' % job_id)
759 log.msg('Job started with job id: %r' % job_id)
758 return job_id
760 return job_id
759
761
760 def write_batch_script(self, n):
762 def write_batch_script(self, n):
761 """Instantiate and write the batch script to the work_dir."""
763 """Instantiate and write the batch script to the work_dir."""
762 self.context['n'] = n
764 self.context['n'] = n
763 script_as_string = Itpl.itplns(self.batch_template, self.context)
765 script_as_string = Itpl.itplns(self.batch_template, self.context)
764 log.msg('Writing instantiated batch script: %s' % self.batch_file)
766 log.msg('Writing instantiated batch script: %s' % self.batch_file)
765 f = open(self.batch_file, 'w')
767 f = open(self.batch_file, 'w')
766 f.write(script_as_string)
768 f.write(script_as_string)
767 f.close()
769 f.close()
768
770
769 @inlineCallbacks
771 @inlineCallbacks
770 def start(self, n):
772 def start(self, n):
771 """Start n copies of the process using a batch system."""
773 """Start n copies of the process using a batch system."""
772 self.write_batch_script(n)
774 self.write_batch_script(n)
773 output = yield getProcessOutput(self.submit_command,
775 output = yield getProcessOutput(self.submit_command,
774 [self.batch_file], env=os.environ)
776 [self.batch_file], env=os.environ)
775 job_id = self.parse_job_id(output)
777 job_id = self.parse_job_id(output)
776 self.notify_start(job_id)
778 self.notify_start(job_id)
777 defer.returnValue(job_id)
779 defer.returnValue(job_id)
778
780
779 @inlineCallbacks
781 @inlineCallbacks
780 def stop(self):
782 def stop(self):
781 output = yield getProcessOutput(self.delete_command,
783 output = yield getProcessOutput(self.delete_command,
782 [self.job_id], env=os.environ
784 [self.job_id], env=os.environ
783 )
785 )
784 self.notify_stop(output) # Pass the output of the kill cmd
786 self.notify_stop(output) # Pass the output of the kill cmd
785 defer.returnValue(output)
787 defer.returnValue(output)
786
788
787
789
788 class PBSLauncher(BatchSystemLauncher):
790 class PBSLauncher(BatchSystemLauncher):
789 """A BatchSystemLauncher subclass for PBS."""
791 """A BatchSystemLauncher subclass for PBS."""
790
792
791 submit_command = Str('qsub', config=True)
793 submit_command = Str('qsub', config=True)
792 delete_command = Str('qdel', config=True)
794 delete_command = Str('qdel', config=True)
793 job_id_regexp = Str('\d+', config=True)
795 job_id_regexp = Str(r'\d+', config=True)
794 batch_template = Str('', config=True)
796 batch_template = Str('', config=True)
795 batch_file_name = Unicode(u'pbs_batch_script', config=True)
797 batch_file_name = Unicode(u'pbs_batch_script', config=True)
796 batch_file = Unicode(u'')
798 batch_file = Unicode(u'')
797
799
798
800
799 class PBSControllerLauncher(PBSLauncher):
801 class PBSControllerLauncher(PBSLauncher):
800 """Launch a controller using PBS."""
802 """Launch a controller using PBS."""
801
803
802 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
804 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
803
805
804 def start(self, cluster_dir):
806 def start(self, cluster_dir):
805 """Start the controller by profile or cluster_dir."""
807 """Start the controller by profile or cluster_dir."""
806 # Here we save profile and cluster_dir in the context so they
808 # Here we save profile and cluster_dir in the context so they
807 # can be used in the batch script template as ${profile} and
809 # can be used in the batch script template as ${profile} and
808 # ${cluster_dir}
810 # ${cluster_dir}
809 self.context['cluster_dir'] = cluster_dir
811 self.context['cluster_dir'] = cluster_dir
810 self.cluster_dir = unicode(cluster_dir)
812 self.cluster_dir = unicode(cluster_dir)
811 log.msg("Starting PBSControllerLauncher: %r" % self.args)
813 log.msg("Starting PBSControllerLauncher: %r" % self.args)
812 return super(PBSControllerLauncher, self).start(1)
814 return super(PBSControllerLauncher, self).start(1)
813
815
814
816
815 class PBSEngineSetLauncher(PBSLauncher):
817 class PBSEngineSetLauncher(PBSLauncher):
816
818
817 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
819 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
818
820
819 def start(self, n, cluster_dir):
821 def start(self, n, cluster_dir):
820 """Start n engines by profile or cluster_dir."""
822 """Start n engines by profile or cluster_dir."""
821 self.program_args.extend(['--cluster-dir', cluster_dir])
823 self.program_args.extend(['--cluster-dir', cluster_dir])
822 self.cluster_dir = unicode(cluster_dir)
824 self.cluster_dir = unicode(cluster_dir)
823 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
825 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
824 return super(PBSEngineSetLauncher, self).start(n)
826 return super(PBSEngineSetLauncher, self).start(n)
825
827
826
828
827 #-----------------------------------------------------------------------------
829 #-----------------------------------------------------------------------------
828 # A launcher for ipcluster itself!
830 # A launcher for ipcluster itself!
829 #-----------------------------------------------------------------------------
831 #-----------------------------------------------------------------------------
830
832
831
833
832 def find_ipcluster_cmd():
834 def find_ipcluster_cmd():
833 """Find the command line ipcluster program in a cross platform way."""
835 """Find the command line ipcluster program in a cross platform way."""
834 if sys.platform == 'win32':
836 if sys.platform == 'win32':
835 # This logic is needed because the ipcluster script doesn't
837 # This logic is needed because the ipcluster script doesn't
836 # always get installed in the same way or in the same location.
838 # always get installed in the same way or in the same location.
837 from IPython.kernel import ipclusterapp
839 from IPython.kernel import ipclusterapp
838 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
840 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
839 # The -u option here turns on unbuffered output, which is required
841 # The -u option here turns on unbuffered output, which is required
840 # on Win32 to prevent wierd conflict and problems with Twisted.
842 # on Win32 to prevent wierd conflict and problems with Twisted.
841 # Also, use sys.executable to make sure we are picking up the
843 # Also, use sys.executable to make sure we are picking up the
842 # right python exe.
844 # right python exe.
843 cmd = [sys.executable, '-u', script_location]
845 cmd = [sys.executable, '-u', script_location]
844 else:
846 else:
845 # ipcontroller has to be on the PATH in this case.
847 # ipcontroller has to be on the PATH in this case.
846 cmd = ['ipcluster']
848 cmd = ['ipcluster']
847 return cmd
849 return cmd
848
850
849
851
850 class IPClusterLauncher(LocalProcessLauncher):
852 class IPClusterLauncher(LocalProcessLauncher):
851 """Launch the ipcluster program in an external process."""
853 """Launch the ipcluster program in an external process."""
852
854
853 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
855 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
854 # Command line arguments to pass to ipcluster.
856 # Command line arguments to pass to ipcluster.
855 ipcluster_args = List(
857 ipcluster_args = List(
856 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
858 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
857 ipcluster_subcommand = Str('start')
859 ipcluster_subcommand = Str('start')
858 ipcluster_n = Int(2)
860 ipcluster_n = Int(2)
859
861
860 def find_args(self):
862 def find_args(self):
861 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
863 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
862 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
864 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
863
865
864 def start(self):
866 def start(self):
865 log.msg("Starting ipcluster: %r" % self.args)
867 log.msg("Starting ipcluster: %r" % self.args)
866 return super(IPClusterLauncher, self).start()
868 return super(IPClusterLauncher, self).start()
867
869
General Comments 0
You need to be logged in to leave comments. Login now