##// END OF EJS Templates
launcher updates for PBS
MinRK -
Show More
@@ -1,184 +1,192 b''
1 import os
1 import os
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Select which launchers to use
6 # Select which launchers to use
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # This allows you to control what method is used to start the controller
9 # This allows you to control what method is used to start the controller
10 # and engines. The following methods are currently supported:
10 # and engines. The following methods are currently supported:
11 # - Start as a regular process on localhost.
11 # - Start as a regular process on localhost.
12 # - Start using mpiexec.
12 # - Start using mpiexec.
13 # - Start using the Windows HPC Server 2008 scheduler
13 # - Start using the Windows HPC Server 2008 scheduler
14 # - Start using PBS
14 # - Start using PBS
15 # - Start using SSH
15 # - Start using SSH
16
16
17
17
18 # The selected launchers can be configured below.
18 # The selected launchers can be configured below.
19
19
20 # Options are:
20 # Options are:
21 # - LocalControllerLauncher
21 # - LocalControllerLauncher
22 # - MPIExecControllerLauncher
22 # - MPIExecControllerLauncher
23 # - PBSControllerLauncher
23 # - PBSControllerLauncher
24 # - WindowsHPCControllerLauncher
24 # - WindowsHPCControllerLauncher
25 # c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
25 # c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
26
26
27 # Options are:
27 # Options are:
28 # - LocalEngineSetLauncher
28 # - LocalEngineSetLauncher
29 # - MPIExecEngineSetLauncher
29 # - MPIExecEngineSetLauncher
30 # - PBSEngineSetLauncher
30 # - PBSEngineSetLauncher
31 # - WindowsHPCEngineSetLauncher
31 # - WindowsHPCEngineSetLauncher
32 # c.Global.engine_launcher = 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
32 # c.Global.engine_launcher = 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Global configuration
35 # Global configuration
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38 # The default number of engines that will be started. This is overridden by
38 # The default number of engines that will be started. This is overridden by
39 # the -n command line option: "ipcluster start -n 4"
39 # the -n command line option: "ipcluster start -n 4"
40 # c.Global.n = 2
40 # c.Global.n = 2
41
41
42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
43 # c.Global.log_to_file = False
43 # c.Global.log_to_file = False
44
44
45 # Remove old logs from cluster_dir/log before starting.
45 # Remove old logs from cluster_dir/log before starting.
46 # c.Global.clean_logs = True
46 # c.Global.clean_logs = True
47
47
48 # The working directory for the process. The application will use os.chdir
48 # The working directory for the process. The application will use os.chdir
49 # to change to this directory before starting.
49 # to change to this directory before starting.
50 # c.Global.work_dir = os.getcwd()
50 # c.Global.work_dir = os.getcwd()
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Local process launchers
54 # Local process launchers
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 # The command line arguments to call the controller with.
57 # The command line arguments to call the controller with.
58 # c.LocalControllerLauncher.controller_args = \
58 # c.LocalControllerLauncher.controller_args = \
59 # ['--log-to-file','--log-level', '40']
59 # ['--log-to-file','--log-level', '40']
60
60
61 # The working directory for the controller
61 # The working directory for the controller
62 # c.LocalEngineSetLauncher.work_dir = u''
62 # c.LocalEngineSetLauncher.work_dir = u''
63
63
64 # Command line argument passed to the engines.
64 # Command line argument passed to the engines.
65 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
65 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # MPIExec launchers
68 # MPIExec launchers
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70
70
71 # The mpiexec/mpirun command to use in started the controller.
71 # The mpiexec/mpirun command to use in started the controller.
72 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
72 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
73
73
74 # Additional arguments to pass to the actual mpiexec command.
74 # Additional arguments to pass to the actual mpiexec command.
75 # c.MPIExecControllerLauncher.mpi_args = []
75 # c.MPIExecControllerLauncher.mpi_args = []
76
76
77 # The command line argument to call the controller with.
77 # The command line argument to call the controller with.
78 # c.MPIExecControllerLauncher.controller_args = \
78 # c.MPIExecControllerLauncher.controller_args = \
79 # ['--log-to-file','--log-level', '40']
79 # ['--log-to-file','--log-level', '40']
80
80
81
81
82 # The mpiexec/mpirun command to use in started the controller.
82 # The mpiexec/mpirun command to use in started the controller.
83 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
83 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
84
84
85 # Additional arguments to pass to the actual mpiexec command.
85 # Additional arguments to pass to the actual mpiexec command.
86 # c.MPIExecEngineSetLauncher.mpi_args = []
86 # c.MPIExecEngineSetLauncher.mpi_args = []
87
87
88 # Command line argument passed to the engines.
88 # Command line argument passed to the engines.
89 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
89 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
90
90
91 # The default number of engines to start if not given elsewhere.
91 # The default number of engines to start if not given elsewhere.
92 # c.MPIExecEngineSetLauncher.n = 1
92 # c.MPIExecEngineSetLauncher.n = 1
93
93
94 #-----------------------------------------------------------------------------
94 #-----------------------------------------------------------------------------
95 # SSH launchers
95 # SSH launchers
96 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
97
97
98 # Todo
98 # Todo
99
99
100
100
101 #-----------------------------------------------------------------------------
101 #-----------------------------------------------------------------------------
102 # Unix batch (PBS) schedulers launchers
102 # Unix batch (PBS) schedulers launchers
103 #-----------------------------------------------------------------------------
103 #-----------------------------------------------------------------------------
104
104
105 # The command line program to use to submit a PBS job.
105 # The command line program to use to submit a PBS job.
106 # c.PBSControllerLauncher.submit_command = 'qsub'
106 # c.PBSControllerLauncher.submit_command = 'qsub'
107
107
108 # The command line program to use to delete a PBS job.
108 # The command line program to use to delete a PBS job.
109 # c.PBSControllerLauncher.delete_command = 'qdel'
109 # c.PBSControllerLauncher.delete_command = 'qdel'
110
110
111 # A regular expression that takes the output of qsub and find the job id.
111 # A regular expression that takes the output of qsub and find the job id.
112 # c.PBSControllerLauncher.job_id_regexp = r'\d+'
112 # c.PBSControllerLauncher.job_id_regexp = r'\d+'
113
113
114 # The batch submission script used to start the controller. This is where
114 # The batch submission script used to start the controller. This is where
115 # environment variables would be setup, etc. This string is interpolated using
115 # environment variables would be setup, etc. This string is interpolated using
116 # the Itpl module in IPython.external. Basically, you can use ${n} for the
116 # the Itpl module in IPython.external. Basically, you can use ${n} for the
117 # number of engine and ${cluster_dir} for the cluster_dir.
117 # number of engine and ${cluster_dir} for the cluster_dir.
118 # c.PBSControllerLauncher.batch_template = """"""
118 # c.PBSControllerLauncher.batch_template = """
119 # #PBS -l nprocs=$n
120 #
121 # ipcontrollerz --cluster-dir $cluster_dir
122 # """
119
123
120 # The name of the instantiated batch script that will actually be used to
124 # The name of the instantiated batch script that will actually be used to
121 # submit the job. This will be written to the cluster directory.
125 # submit the job. This will be written to the cluster directory.
122 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
126 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
123
127
124
128
125 # The command line program to use to submit a PBS job.
129 # The command line program to use to submit a PBS job.
126 # c.PBSEngineSetLauncher.submit_command = 'qsub'
130 # c.PBSEngineSetLauncher.submit_command = 'qsub'
127
131
128 # The command line program to use to delete a PBS job.
132 # The command line program to use to delete a PBS job.
129 # c.PBSEngineSetLauncher.delete_command = 'qdel'
133 # c.PBSEngineSetLauncher.delete_command = 'qdel'
130
134
131 # A regular expression that takes the output of qsub and find the job id.
135 # A regular expression that takes the output of qsub and find the job id.
132 # c.PBSEngineSetLauncher.job_id_regexp = r'\d+'
136 # c.PBSEngineSetLauncher.job_id_regexp = r'\d+'
133
137
134 # The batch submission script used to start the engines. This is where
138 # The batch submission script used to start the engines. This is where
135 # environment variables would be setup, etc. This string is interpolated using
139 # environment variables would be setup, etc. This string is interpolated using
136 # the Itpl module in IPython.external. Basically, you can use ${n} for the
140 # the Itpl module in IPython.external. Basically, you can use ${n} for the
137 # number of engine and ${cluster_dir} for the cluster_dir.
141 # number of engine and ${cluster_dir} for the cluster_dir.
138 # c.PBSEngineSetLauncher.batch_template = """"""
142 # c.PBSEngineSetLauncher.batch_template = """
143 # #PBS -l nprocs=$n
144 #
145 # ipenginez --cluster-dir $cluster_dir$s
146 # """
139
147
140 # The name of the instantiated batch script that will actually be used to
148 # The name of the instantiated batch script that will actually be used to
141 # submit the job. This will be written to the cluster directory.
149 # submit the job. This will be written to the cluster directory.
142 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
150 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
143
151
144 #-----------------------------------------------------------------------------
152 #-----------------------------------------------------------------------------
145 # Windows HPC Server 2008 launcher configuration
153 # Windows HPC Server 2008 launcher configuration
146 #-----------------------------------------------------------------------------
154 #-----------------------------------------------------------------------------
147
155
148 # c.IPControllerJob.job_name = 'IPController'
156 # c.IPControllerJob.job_name = 'IPController'
149 # c.IPControllerJob.is_exclusive = False
157 # c.IPControllerJob.is_exclusive = False
150 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
158 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
151 # c.IPControllerJob.priority = 'Highest'
159 # c.IPControllerJob.priority = 'Highest'
152 # c.IPControllerJob.requested_nodes = ''
160 # c.IPControllerJob.requested_nodes = ''
153 # c.IPControllerJob.project = 'MyProject'
161 # c.IPControllerJob.project = 'MyProject'
154
162
155 # c.IPControllerTask.task_name = 'IPController'
163 # c.IPControllerTask.task_name = 'IPController'
156 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
164 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
157 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
165 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
158 # c.IPControllerTask.environment_variables = {}
166 # c.IPControllerTask.environment_variables = {}
159
167
160 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
168 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
161 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
169 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
162
170
163
171
164 # c.IPEngineSetJob.job_name = 'IPEngineSet'
172 # c.IPEngineSetJob.job_name = 'IPEngineSet'
165 # c.IPEngineSetJob.is_exclusive = False
173 # c.IPEngineSetJob.is_exclusive = False
166 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
174 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
167 # c.IPEngineSetJob.priority = 'Highest'
175 # c.IPEngineSetJob.priority = 'Highest'
168 # c.IPEngineSetJob.requested_nodes = ''
176 # c.IPEngineSetJob.requested_nodes = ''
169 # c.IPEngineSetJob.project = 'MyProject'
177 # c.IPEngineSetJob.project = 'MyProject'
170
178
171 # c.IPEngineTask.task_name = 'IPEngine'
179 # c.IPEngineTask.task_name = 'IPEngine'
172 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
180 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
173 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
181 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
174 # c.IPEngineTask.environment_variables = {}
182 # c.IPEngineTask.environment_variables = {}
175
183
176 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
184 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
177 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
185 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
178
186
179
187
180
188
181
189
182
190
183
191
184
192
@@ -1,592 +1,592 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import errno
18 import errno
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import signal
22 import signal
23
23
24 import zmq
24 import zmq
25 from zmq.eventloop import ioloop
25 from zmq.eventloop import ioloop
26
26
27 from IPython.external.argparse import ArgumentParser, SUPPRESS
27 from IPython.external.argparse import ArgumentParser, SUPPRESS
28 from IPython.utils.importstring import import_item
28 from IPython.utils.importstring import import_item
29 from IPython.zmq.parallel.clusterdir import (
29 from IPython.zmq.parallel.clusterdir import (
30 ApplicationWithClusterDir, ClusterDirConfigLoader,
30 ApplicationWithClusterDir, ClusterDirConfigLoader,
31 ClusterDirError, PIDFileError
31 ClusterDirError, PIDFileError
32 )
32 )
33
33
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Module level variables
36 # Module level variables
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39
39
40 default_config_file_name = u'ipclusterz_config.py'
40 default_config_file_name = u'ipclusterz_config.py'
41
41
42
42
43 _description = """\
43 _description = """\
44 Start an IPython cluster for parallel computing.\n\n
44 Start an IPython cluster for parallel computing.\n\n
45
45
46 An IPython cluster consists of 1 controller and 1 or more engines.
46 An IPython cluster consists of 1 controller and 1 or more engines.
47 This command automates the startup of these processes using a wide
47 This command automates the startup of these processes using a wide
48 range of startup methods (SSH, local processes, PBS, mpiexec,
48 range of startup methods (SSH, local processes, PBS, mpiexec,
49 Windows HPC Server 2008). To start a cluster with 4 engines on your
49 Windows HPC Server 2008). To start a cluster with 4 engines on your
50 local host simply do 'ipclusterz start -n 4'. For more complex usage
50 local host simply do 'ipclusterz start -n 4'. For more complex usage
51 you will typically do 'ipclusterz create -p mycluster', then edit
51 you will typically do 'ipclusterz create -p mycluster', then edit
52 configuration files, followed by 'ipclusterz start -p mycluster -n 4'.
52 configuration files, followed by 'ipclusterz start -p mycluster -n 4'.
53 """
53 """
54
54
55
55
56 # Exit codes for ipcluster
56 # Exit codes for ipcluster
57
57
58 # This will be the exit code if the ipcluster appears to be running because
58 # This will be the exit code if the ipcluster appears to be running because
59 # a .pid file exists
59 # a .pid file exists
60 ALREADY_STARTED = 10
60 ALREADY_STARTED = 10
61
61
62
62
63 # This will be the exit code if ipcluster stop is run, but there is not .pid
63 # This will be the exit code if ipcluster stop is run, but there is not .pid
64 # file to be found.
64 # file to be found.
65 ALREADY_STOPPED = 11
65 ALREADY_STOPPED = 11
66
66
67 # This will be the exit code if ipcluster engines is run, but there is not .pid
67 # This will be the exit code if ipcluster engines is run, but there is not .pid
68 # file to be found.
68 # file to be found.
69 NO_CLUSTER = 12
69 NO_CLUSTER = 12
70
70
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Command line options
73 # Command line options
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
77 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
78
78
79 def _add_arguments(self):
79 def _add_arguments(self):
80 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
80 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
81 # its defaults on self.parser. Instead, we will put those on
81 # its defaults on self.parser. Instead, we will put those on
82 # default options on our subparsers.
82 # default options on our subparsers.
83
83
84 # This has all the common options that all subcommands use
84 # This has all the common options that all subcommands use
85 parent_parser1 = ArgumentParser(
85 parent_parser1 = ArgumentParser(
86 add_help=False,
86 add_help=False,
87 argument_default=SUPPRESS
87 argument_default=SUPPRESS
88 )
88 )
89 self._add_ipython_dir(parent_parser1)
89 self._add_ipython_dir(parent_parser1)
90 self._add_log_level(parent_parser1)
90 self._add_log_level(parent_parser1)
91
91
92 # This has all the common options that other subcommands use
92 # This has all the common options that other subcommands use
93 parent_parser2 = ArgumentParser(
93 parent_parser2 = ArgumentParser(
94 add_help=False,
94 add_help=False,
95 argument_default=SUPPRESS
95 argument_default=SUPPRESS
96 )
96 )
97 self._add_cluster_profile(parent_parser2)
97 self._add_cluster_profile(parent_parser2)
98 self._add_cluster_dir(parent_parser2)
98 self._add_cluster_dir(parent_parser2)
99 self._add_work_dir(parent_parser2)
99 self._add_work_dir(parent_parser2)
100 paa = parent_parser2.add_argument
100 paa = parent_parser2.add_argument
101 paa('--log-to-file',
101 paa('--log-to-file',
102 action='store_true', dest='Global.log_to_file',
102 action='store_true', dest='Global.log_to_file',
103 help='Log to a file in the log directory (default is stdout)')
103 help='Log to a file in the log directory (default is stdout)')
104
104
105 # Create the object used to create the subparsers.
105 # Create the object used to create the subparsers.
106 subparsers = self.parser.add_subparsers(
106 subparsers = self.parser.add_subparsers(
107 dest='Global.subcommand',
107 dest='Global.subcommand',
108 title='ipcluster subcommands',
108 title='ipcluster subcommands',
109 description=
109 description=
110 """ipcluster has a variety of subcommands. The general way of
110 """ipcluster has a variety of subcommands. The general way of
111 running ipcluster is 'ipclusterz <cmd> [options]'. To get help
111 running ipcluster is 'ipclusterz <cmd> [options]'. To get help
112 on a particular subcommand do 'ipclusterz <cmd> -h'."""
112 on a particular subcommand do 'ipclusterz <cmd> -h'."""
113 # help="For more help, type 'ipclusterz <cmd> -h'",
113 # help="For more help, type 'ipclusterz <cmd> -h'",
114 )
114 )
115
115
116 # The "list" subcommand parser
116 # The "list" subcommand parser
117 parser_list = subparsers.add_parser(
117 parser_list = subparsers.add_parser(
118 'list',
118 'list',
119 parents=[parent_parser1],
119 parents=[parent_parser1],
120 argument_default=SUPPRESS,
120 argument_default=SUPPRESS,
121 help="List all clusters in cwd and ipython_dir.",
121 help="List all clusters in cwd and ipython_dir.",
122 description=
122 description=
123 """List all available clusters, by cluster directory, that can
123 """List all available clusters, by cluster directory, that can
124 be found in the current working directly or in the ipython
124 be found in the current working directly or in the ipython
125 directory. Cluster directories are named using the convention
125 directory. Cluster directories are named using the convention
126 'cluster_<profile>'."""
126 'clusterz_<profile>'."""
127 )
127 )
128
128
129 # The "create" subcommand parser
129 # The "create" subcommand parser
130 parser_create = subparsers.add_parser(
130 parser_create = subparsers.add_parser(
131 'create',
131 'create',
132 parents=[parent_parser1, parent_parser2],
132 parents=[parent_parser1, parent_parser2],
133 argument_default=SUPPRESS,
133 argument_default=SUPPRESS,
134 help="Create a new cluster directory.",
134 help="Create a new cluster directory.",
135 description=
135 description=
136 """Create an ipython cluster directory by its profile name or
136 """Create an ipython cluster directory by its profile name or
137 cluster directory path. Cluster directories contain
137 cluster directory path. Cluster directories contain
138 configuration, log and security related files and are named
138 configuration, log and security related files and are named
139 using the convention 'cluster_<profile>'. By default they are
139 using the convention 'clusterz_<profile>'. By default they are
140 located in your ipython directory. Once created, you will
140 located in your ipython directory. Once created, you will
141 probably need to edit the configuration files in the cluster
141 probably need to edit the configuration files in the cluster
142 directory to configure your cluster. Most users will create a
142 directory to configure your cluster. Most users will create a
143 cluster directory by profile name,
143 cluster directory by profile name,
144 'ipclusterz create -p mycluster', which will put the directory
144 'ipclusterz create -p mycluster', which will put the directory
145 in '<ipython_dir>/cluster_mycluster'.
145 in '<ipython_dir>/clusterz_mycluster'.
146 """
146 """
147 )
147 )
148 paa = parser_create.add_argument
148 paa = parser_create.add_argument
149 paa('--reset-config',
149 paa('--reset-config',
150 dest='Global.reset_config', action='store_true',
150 dest='Global.reset_config', action='store_true',
151 help=
151 help=
152 """Recopy the default config files to the cluster directory.
152 """Recopy the default config files to the cluster directory.
153 You will loose any modifications you have made to these files.""")
153 You will loose any modifications you have made to these files.""")
154
154
155 # The "start" subcommand parser
155 # The "start" subcommand parser
156 parser_start = subparsers.add_parser(
156 parser_start = subparsers.add_parser(
157 'start',
157 'start',
158 parents=[parent_parser1, parent_parser2],
158 parents=[parent_parser1, parent_parser2],
159 argument_default=SUPPRESS,
159 argument_default=SUPPRESS,
160 help="Start a cluster.",
160 help="Start a cluster.",
161 description=
161 description=
162 """Start an ipython cluster by its profile name or cluster
162 """Start an ipython cluster by its profile name or cluster
163 directory. Cluster directories contain configuration, log and
163 directory. Cluster directories contain configuration, log and
164 security related files and are named using the convention
164 security related files and are named using the convention
165 'cluster_<profile>' and should be creating using the 'start'
165 'clusterz_<profile>' and should be creating using the 'start'
166 subcommand of 'ipcluster'. If your cluster directory is in
166 subcommand of 'ipcluster'. If your cluster directory is in
167 the cwd or the ipython directory, you can simply refer to it
167 the cwd or the ipython directory, you can simply refer to it
168 using its profile name, 'ipclusterz start -n 4 -p <profile>`,
168 using its profile name, 'ipclusterz start -n 4 -p <profile>`,
169 otherwise use the '--cluster-dir' option.
169 otherwise use the '--cluster-dir' option.
170 """
170 """
171 )
171 )
172
172
173 paa = parser_start.add_argument
173 paa = parser_start.add_argument
174 paa('-n', '--number',
174 paa('-n', '--number',
175 type=int, dest='Global.n',
175 type=int, dest='Global.n',
176 help='The number of engines to start.',
176 help='The number of engines to start.',
177 metavar='Global.n')
177 metavar='Global.n')
178 paa('--clean-logs',
178 paa('--clean-logs',
179 dest='Global.clean_logs', action='store_true',
179 dest='Global.clean_logs', action='store_true',
180 help='Delete old log flies before starting.')
180 help='Delete old log flies before starting.')
181 paa('--no-clean-logs',
181 paa('--no-clean-logs',
182 dest='Global.clean_logs', action='store_false',
182 dest='Global.clean_logs', action='store_false',
183 help="Don't delete old log flies before starting.")
183 help="Don't delete old log flies before starting.")
184 paa('--daemon',
184 paa('--daemon',
185 dest='Global.daemonize', action='store_true',
185 dest='Global.daemonize', action='store_true',
186 help='Daemonize the ipcluster program. This implies --log-to-file')
186 help='Daemonize the ipcluster program. This implies --log-to-file')
187 paa('--no-daemon',
187 paa('--no-daemon',
188 dest='Global.daemonize', action='store_false',
188 dest='Global.daemonize', action='store_false',
189 help="Dont't daemonize the ipcluster program.")
189 help="Dont't daemonize the ipcluster program.")
190 paa('--delay',
190 paa('--delay',
191 type=float, dest='Global.delay',
191 type=float, dest='Global.delay',
192 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
192 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
193
193
194 # The "stop" subcommand parser
194 # The "stop" subcommand parser
195 parser_stop = subparsers.add_parser(
195 parser_stop = subparsers.add_parser(
196 'stop',
196 'stop',
197 parents=[parent_parser1, parent_parser2],
197 parents=[parent_parser1, parent_parser2],
198 argument_default=SUPPRESS,
198 argument_default=SUPPRESS,
199 help="Stop a running cluster.",
199 help="Stop a running cluster.",
200 description=
200 description=
201 """Stop a running ipython cluster by its profile name or cluster
201 """Stop a running ipython cluster by its profile name or cluster
202 directory. Cluster directories are named using the convention
202 directory. Cluster directories are named using the convention
203 'cluster_<profile>'. If your cluster directory is in
203 'clusterz_<profile>'. If your cluster directory is in
204 the cwd or the ipython directory, you can simply refer to it
204 the cwd or the ipython directory, you can simply refer to it
205 using its profile name, 'ipclusterz stop -p <profile>`, otherwise
205 using its profile name, 'ipclusterz stop -p <profile>`, otherwise
206 use the '--cluster-dir' option.
206 use the '--cluster-dir' option.
207 """
207 """
208 )
208 )
209 paa = parser_stop.add_argument
209 paa = parser_stop.add_argument
210 paa('--signal',
210 paa('--signal',
211 dest='Global.signal', type=int,
211 dest='Global.signal', type=int,
212 help="The signal number to use in stopping the cluster (default=2).",
212 help="The signal number to use in stopping the cluster (default=2).",
213 metavar="Global.signal")
213 metavar="Global.signal")
214
214
215 # the "engines" subcommand parser
215 # the "engines" subcommand parser
216 parser_engines = subparsers.add_parser(
216 parser_engines = subparsers.add_parser(
217 'engines',
217 'engines',
218 parents=[parent_parser1, parent_parser2],
218 parents=[parent_parser1, parent_parser2],
219 argument_default=SUPPRESS,
219 argument_default=SUPPRESS,
220 help="Attach some engines to an existing controller or cluster.",
220 help="Attach some engines to an existing controller or cluster.",
221 description=
221 description=
222 """Start one or more engines to connect to an existing Cluster
222 """Start one or more engines to connect to an existing Cluster
223 by profile name or cluster directory.
223 by profile name or cluster directory.
224 Cluster directories contain configuration, log and
224 Cluster directories contain configuration, log and
225 security related files and are named using the convention
225 security related files and are named using the convention
226 'cluster_<profile>' and should be creating using the 'start'
226 'clusterz_<profile>' and should be creating using the 'start'
227 subcommand of 'ipcluster'. If your cluster directory is in
227 subcommand of 'ipcluster'. If your cluster directory is in
228 the cwd or the ipython directory, you can simply refer to it
228 the cwd or the ipython directory, you can simply refer to it
229 using its profile name, 'ipclusterz engines -n 4 -p <profile>`,
229 using its profile name, 'ipclusterz engines -n 4 -p <profile>`,
230 otherwise use the '--cluster-dir' option.
230 otherwise use the '--cluster-dir' option.
231 """
231 """
232 )
232 )
233 paa = parser_engines.add_argument
233 paa = parser_engines.add_argument
234 paa('-n', '--number',
234 paa('-n', '--number',
235 type=int, dest='Global.n',
235 type=int, dest='Global.n',
236 help='The number of engines to start.',
236 help='The number of engines to start.',
237 metavar='Global.n')
237 metavar='Global.n')
238 paa('--daemon',
238 paa('--daemon',
239 dest='Global.daemonize', action='store_true',
239 dest='Global.daemonize', action='store_true',
240 help='Daemonize the ipcluster program. This implies --log-to-file')
240 help='Daemonize the ipcluster program. This implies --log-to-file')
241 paa('--no-daemon',
241 paa('--no-daemon',
242 dest='Global.daemonize', action='store_false',
242 dest='Global.daemonize', action='store_false',
243 help="Dont't daemonize the ipcluster program.")
243 help="Dont't daemonize the ipcluster program.")
244
244
245 #-----------------------------------------------------------------------------
245 #-----------------------------------------------------------------------------
246 # Main application
246 # Main application
247 #-----------------------------------------------------------------------------
247 #-----------------------------------------------------------------------------
248
248
249
249
250 class IPClusterApp(ApplicationWithClusterDir):
250 class IPClusterApp(ApplicationWithClusterDir):
251
251
252 name = u'ipclusterz'
252 name = u'ipclusterz'
253 description = _description
253 description = _description
254 usage = None
254 usage = None
255 command_line_loader = IPClusterAppConfigLoader
255 command_line_loader = IPClusterAppConfigLoader
256 default_config_file_name = default_config_file_name
256 default_config_file_name = default_config_file_name
257 default_log_level = logging.INFO
257 default_log_level = logging.INFO
258 auto_create_cluster_dir = False
258 auto_create_cluster_dir = False
259
259
260 def create_default_config(self):
260 def create_default_config(self):
261 super(IPClusterApp, self).create_default_config()
261 super(IPClusterApp, self).create_default_config()
262 self.default_config.Global.controller_launcher = \
262 self.default_config.Global.controller_launcher = \
263 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
263 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
264 self.default_config.Global.engine_launcher = \
264 self.default_config.Global.engine_launcher = \
265 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
265 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
266 self.default_config.Global.n = 2
266 self.default_config.Global.n = 2
267 self.default_config.Global.delay = 2
267 self.default_config.Global.delay = 2
268 self.default_config.Global.reset_config = False
268 self.default_config.Global.reset_config = False
269 self.default_config.Global.clean_logs = True
269 self.default_config.Global.clean_logs = True
270 self.default_config.Global.signal = signal.SIGINT
270 self.default_config.Global.signal = signal.SIGINT
271 self.default_config.Global.daemonize = False
271 self.default_config.Global.daemonize = False
272
272
273 def find_resources(self):
273 def find_resources(self):
274 subcommand = self.command_line_config.Global.subcommand
274 subcommand = self.command_line_config.Global.subcommand
275 if subcommand=='list':
275 if subcommand=='list':
276 self.list_cluster_dirs()
276 self.list_cluster_dirs()
277 # Exit immediately because there is nothing left to do.
277 # Exit immediately because there is nothing left to do.
278 self.exit()
278 self.exit()
279 elif subcommand=='create':
279 elif subcommand=='create':
280 self.auto_create_cluster_dir = True
280 self.auto_create_cluster_dir = True
281 super(IPClusterApp, self).find_resources()
281 super(IPClusterApp, self).find_resources()
282 elif subcommand=='start' or subcommand=='stop':
282 elif subcommand=='start' or subcommand=='stop':
283 self.auto_create_cluster_dir = True
283 self.auto_create_cluster_dir = True
284 try:
284 try:
285 super(IPClusterApp, self).find_resources()
285 super(IPClusterApp, self).find_resources()
286 except ClusterDirError:
286 except ClusterDirError:
287 raise ClusterDirError(
287 raise ClusterDirError(
288 "Could not find a cluster directory. A cluster dir must "
288 "Could not find a cluster directory. A cluster dir must "
289 "be created before running 'ipclusterz start'. Do "
289 "be created before running 'ipclusterz start'. Do "
290 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
290 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
291 "information about creating and listing cluster dirs."
291 "information about creating and listing cluster dirs."
292 )
292 )
293 elif subcommand=='engines':
293 elif subcommand=='engines':
294 self.auto_create_cluster_dir = False
294 self.auto_create_cluster_dir = False
295 try:
295 try:
296 super(IPClusterApp, self).find_resources()
296 super(IPClusterApp, self).find_resources()
297 except ClusterDirError:
297 except ClusterDirError:
298 raise ClusterDirError(
298 raise ClusterDirError(
299 "Could not find a cluster directory. A cluster dir must "
299 "Could not find a cluster directory. A cluster dir must "
300 "be created before running 'ipclusterz start'. Do "
300 "be created before running 'ipclusterz start'. Do "
301 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
301 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
302 "information about creating and listing cluster dirs."
302 "information about creating and listing cluster dirs."
303 )
303 )
304
304
305 def list_cluster_dirs(self):
305 def list_cluster_dirs(self):
306 # Find the search paths
306 # Find the search paths
307 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
307 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
308 if cluster_dir_paths:
308 if cluster_dir_paths:
309 cluster_dir_paths = cluster_dir_paths.split(':')
309 cluster_dir_paths = cluster_dir_paths.split(':')
310 else:
310 else:
311 cluster_dir_paths = []
311 cluster_dir_paths = []
312 try:
312 try:
313 ipython_dir = self.command_line_config.Global.ipython_dir
313 ipython_dir = self.command_line_config.Global.ipython_dir
314 except AttributeError:
314 except AttributeError:
315 ipython_dir = self.default_config.Global.ipython_dir
315 ipython_dir = self.default_config.Global.ipython_dir
316 paths = [os.getcwd(), ipython_dir] + \
316 paths = [os.getcwd(), ipython_dir] + \
317 cluster_dir_paths
317 cluster_dir_paths
318 paths = list(set(paths))
318 paths = list(set(paths))
319
319
320 self.log.info('Searching for cluster dirs in paths: %r' % paths)
320 self.log.info('Searching for cluster dirs in paths: %r' % paths)
321 for path in paths:
321 for path in paths:
322 files = os.listdir(path)
322 files = os.listdir(path)
323 for f in files:
323 for f in files:
324 full_path = os.path.join(path, f)
324 full_path = os.path.join(path, f)
325 if os.path.isdir(full_path) and f.startswith('cluster_'):
325 if os.path.isdir(full_path) and f.startswith('clusterz_'):
326 profile = full_path.split('_')[-1]
326 profile = full_path.split('_')[-1]
327 start_cmd = 'ipclusterz start -p %s -n 4' % profile
327 start_cmd = 'ipclusterz start -p %s -n 4' % profile
328 print start_cmd + " ==> " + full_path
328 print start_cmd + " ==> " + full_path
329
329
330 def pre_construct(self):
330 def pre_construct(self):
331 # IPClusterApp.pre_construct() is where we cd to the working directory.
331 # IPClusterApp.pre_construct() is where we cd to the working directory.
332 super(IPClusterApp, self).pre_construct()
332 super(IPClusterApp, self).pre_construct()
333 config = self.master_config
333 config = self.master_config
334 try:
334 try:
335 daemon = config.Global.daemonize
335 daemon = config.Global.daemonize
336 if daemon:
336 if daemon:
337 config.Global.log_to_file = True
337 config.Global.log_to_file = True
338 except AttributeError:
338 except AttributeError:
339 pass
339 pass
340
340
341 def construct(self):
341 def construct(self):
342 config = self.master_config
342 config = self.master_config
343 subcmd = config.Global.subcommand
343 subcmd = config.Global.subcommand
344 reset = config.Global.reset_config
344 reset = config.Global.reset_config
345 if subcmd == 'list':
345 if subcmd == 'list':
346 return
346 return
347 if subcmd == 'create':
347 if subcmd == 'create':
348 self.log.info('Copying default config files to cluster directory '
348 self.log.info('Copying default config files to cluster directory '
349 '[overwrite=%r]' % (reset,))
349 '[overwrite=%r]' % (reset,))
350 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
350 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
351 if subcmd =='start':
351 if subcmd =='start':
352 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
352 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
353 self.start_logging()
353 self.start_logging()
354 self.loop = ioloop.IOLoop.instance()
354 self.loop = ioloop.IOLoop.instance()
355 # reactor.callWhenRunning(self.start_launchers)
355 # reactor.callWhenRunning(self.start_launchers)
356 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
356 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
357 dc.start()
357 dc.start()
358 if subcmd == 'engines':
358 if subcmd == 'engines':
359 self.start_logging()
359 self.start_logging()
360 self.loop = ioloop.IOLoop.instance()
360 self.loop = ioloop.IOLoop.instance()
361 # reactor.callWhenRunning(self.start_launchers)
361 # reactor.callWhenRunning(self.start_launchers)
362 engine_only = lambda : self.start_launchers(controller=False)
362 engine_only = lambda : self.start_launchers(controller=False)
363 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
363 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
364 dc.start()
364 dc.start()
365
365
366 def start_launchers(self, controller=True):
366 def start_launchers(self, controller=True):
367 config = self.master_config
367 config = self.master_config
368
368
369 # Create the launchers. In both bases, we set the work_dir of
369 # Create the launchers. In both bases, we set the work_dir of
370 # the launcher to the cluster_dir. This is where the launcher's
370 # the launcher to the cluster_dir. This is where the launcher's
371 # subprocesses will be launched. It is not where the controller
371 # subprocesses will be launched. It is not where the controller
372 # and engine will be launched.
372 # and engine will be launched.
373 if controller:
373 if controller:
374 cl_class = import_item(config.Global.controller_launcher)
374 cl_class = import_item(config.Global.controller_launcher)
375 self.controller_launcher = cl_class(
375 self.controller_launcher = cl_class(
376 work_dir=self.cluster_dir, config=config,
376 work_dir=self.cluster_dir, config=config,
377 logname=self.log.name
377 logname=self.log.name
378 )
378 )
379 # Setup the observing of stopping. If the controller dies, shut
379 # Setup the observing of stopping. If the controller dies, shut
380 # everything down as that will be completely fatal for the engines.
380 # everything down as that will be completely fatal for the engines.
381 self.controller_launcher.on_stop(self.stop_launchers)
381 self.controller_launcher.on_stop(self.stop_launchers)
382 # But, we don't monitor the stopping of engines. An engine dying
382 # But, we don't monitor the stopping of engines. An engine dying
383 # is just fine and in principle a user could start a new engine.
383 # is just fine and in principle a user could start a new engine.
384 # Also, if we did monitor engine stopping, it is difficult to
384 # Also, if we did monitor engine stopping, it is difficult to
385 # know what to do when only some engines die. Currently, the
385 # know what to do when only some engines die. Currently, the
386 # observing of engine stopping is inconsistent. Some launchers
386 # observing of engine stopping is inconsistent. Some launchers
387 # might trigger on a single engine stopping, other wait until
387 # might trigger on a single engine stopping, other wait until
388 # all stop. TODO: think more about how to handle this.
388 # all stop. TODO: think more about how to handle this.
389 else:
389 else:
390 self.controller_launcher = None
390 self.controller_launcher = None
391
391
392 el_class = import_item(config.Global.engine_launcher)
392 el_class = import_item(config.Global.engine_launcher)
393 self.engine_launcher = el_class(
393 self.engine_launcher = el_class(
394 work_dir=self.cluster_dir, config=config, logname=self.log.name
394 work_dir=self.cluster_dir, config=config, logname=self.log.name
395 )
395 )
396
396
397 # Setup signals
397 # Setup signals
398 signal.signal(signal.SIGINT, self.sigint_handler)
398 signal.signal(signal.SIGINT, self.sigint_handler)
399
399
400 # Start the controller and engines
400 # Start the controller and engines
401 self._stopping = False # Make sure stop_launchers is not called 2x.
401 self._stopping = False # Make sure stop_launchers is not called 2x.
402 if controller:
402 if controller:
403 self.start_controller()
403 self.start_controller()
404 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
404 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
405 dc.start()
405 dc.start()
406 self.startup_message()
406 self.startup_message()
407
407
408 def startup_message(self, r=None):
408 def startup_message(self, r=None):
409 self.log.info("IPython cluster: started")
409 self.log.info("IPython cluster: started")
410 return r
410 return r
411
411
412 def start_controller(self, r=None):
412 def start_controller(self, r=None):
413 # self.log.info("In start_controller")
413 # self.log.info("In start_controller")
414 config = self.master_config
414 config = self.master_config
415 d = self.controller_launcher.start(
415 d = self.controller_launcher.start(
416 cluster_dir=config.Global.cluster_dir
416 cluster_dir=config.Global.cluster_dir
417 )
417 )
418 return d
418 return d
419
419
420 def start_engines(self, r=None):
420 def start_engines(self, r=None):
421 # self.log.info("In start_engines")
421 # self.log.info("In start_engines")
422 config = self.master_config
422 config = self.master_config
423
423
424 d = self.engine_launcher.start(
424 d = self.engine_launcher.start(
425 config.Global.n,
425 config.Global.n,
426 cluster_dir=config.Global.cluster_dir
426 cluster_dir=config.Global.cluster_dir
427 )
427 )
428 return d
428 return d
429
429
430 def stop_controller(self, r=None):
430 def stop_controller(self, r=None):
431 # self.log.info("In stop_controller")
431 # self.log.info("In stop_controller")
432 if self.controller_launcher and self.controller_launcher.running:
432 if self.controller_launcher and self.controller_launcher.running:
433 return self.controller_launcher.stop()
433 return self.controller_launcher.stop()
434
434
435 def stop_engines(self, r=None):
435 def stop_engines(self, r=None):
436 # self.log.info("In stop_engines")
436 # self.log.info("In stop_engines")
437 if self.engine_launcher.running:
437 if self.engine_launcher.running:
438 d = self.engine_launcher.stop()
438 d = self.engine_launcher.stop()
439 # d.addErrback(self.log_err)
439 # d.addErrback(self.log_err)
440 return d
440 return d
441 else:
441 else:
442 return None
442 return None
443
443
444 def log_err(self, f):
444 def log_err(self, f):
445 self.log.error(f.getTraceback())
445 self.log.error(f.getTraceback())
446 return None
446 return None
447
447
448 def stop_launchers(self, r=None):
448 def stop_launchers(self, r=None):
449 if not self._stopping:
449 if not self._stopping:
450 self._stopping = True
450 self._stopping = True
451 # if isinstance(r, failure.Failure):
451 # if isinstance(r, failure.Failure):
452 # self.log.error('Unexpected error in ipcluster:')
452 # self.log.error('Unexpected error in ipcluster:')
453 # self.log.info(r.getTraceback())
453 # self.log.info(r.getTraceback())
454 self.log.error("IPython cluster: stopping")
454 self.log.error("IPython cluster: stopping")
455 # These return deferreds. We are not doing anything with them
455 # These return deferreds. We are not doing anything with them
456 # but we are holding refs to them as a reminder that they
456 # but we are holding refs to them as a reminder that they
457 # do return deferreds.
457 # do return deferreds.
458 d1 = self.stop_engines()
458 d1 = self.stop_engines()
459 d2 = self.stop_controller()
459 d2 = self.stop_controller()
460 # Wait a few seconds to let things shut down.
460 # Wait a few seconds to let things shut down.
461 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
461 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
462 dc.start()
462 dc.start()
463 # reactor.callLater(4.0, reactor.stop)
463 # reactor.callLater(4.0, reactor.stop)
464
464
465 def sigint_handler(self, signum, frame):
465 def sigint_handler(self, signum, frame):
466 self.stop_launchers()
466 self.stop_launchers()
467
467
468 def start_logging(self):
468 def start_logging(self):
469 # Remove old log files of the controller and engine
469 # Remove old log files of the controller and engine
470 if self.master_config.Global.clean_logs:
470 if self.master_config.Global.clean_logs:
471 log_dir = self.master_config.Global.log_dir
471 log_dir = self.master_config.Global.log_dir
472 for f in os.listdir(log_dir):
472 for f in os.listdir(log_dir):
473 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
473 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
474 os.remove(os.path.join(log_dir, f))
474 os.remove(os.path.join(log_dir, f))
475 # This will remove old log files for ipcluster itself
475 # This will remove old log files for ipcluster itself
476 super(IPClusterApp, self).start_logging()
476 super(IPClusterApp, self).start_logging()
477
477
478 def start_app(self):
478 def start_app(self):
479 """Start the application, depending on what subcommand is used."""
479 """Start the application, depending on what subcommand is used."""
480 subcmd = self.master_config.Global.subcommand
480 subcmd = self.master_config.Global.subcommand
481 if subcmd=='create' or subcmd=='list':
481 if subcmd=='create' or subcmd=='list':
482 return
482 return
483 elif subcmd=='start':
483 elif subcmd=='start':
484 self.start_app_start()
484 self.start_app_start()
485 elif subcmd=='stop':
485 elif subcmd=='stop':
486 self.start_app_stop()
486 self.start_app_stop()
487 elif subcmd=='engines':
487 elif subcmd=='engines':
488 self.start_app_engines()
488 self.start_app_engines()
489
489
490 def start_app_start(self):
490 def start_app_start(self):
491 """Start the app for the start subcommand."""
491 """Start the app for the start subcommand."""
492 config = self.master_config
492 config = self.master_config
493 # First see if the cluster is already running
493 # First see if the cluster is already running
494 try:
494 try:
495 pid = self.get_pid_from_file()
495 pid = self.get_pid_from_file()
496 except PIDFileError:
496 except PIDFileError:
497 pass
497 pass
498 else:
498 else:
499 self.log.critical(
499 self.log.critical(
500 'Cluster is already running with [pid=%s]. '
500 'Cluster is already running with [pid=%s]. '
501 'use "ipclusterz stop" to stop the cluster.' % pid
501 'use "ipclusterz stop" to stop the cluster.' % pid
502 )
502 )
503 # Here I exit with a unusual exit status that other processes
503 # Here I exit with a unusual exit status that other processes
504 # can watch for to learn how I existed.
504 # can watch for to learn how I existed.
505 self.exit(ALREADY_STARTED)
505 self.exit(ALREADY_STARTED)
506
506
507 # Now log and daemonize
507 # Now log and daemonize
508 self.log.info(
508 self.log.info(
509 'Starting ipclusterz with [daemon=%r]' % config.Global.daemonize
509 'Starting ipclusterz with [daemon=%r]' % config.Global.daemonize
510 )
510 )
511 # TODO: Get daemonize working on Windows or as a Windows Server.
511 # TODO: Get daemonize working on Windows or as a Windows Server.
512 if config.Global.daemonize:
512 if config.Global.daemonize:
513 if os.name=='posix':
513 if os.name=='posix':
514 from twisted.scripts._twistd_unix import daemonize
514 from twisted.scripts._twistd_unix import daemonize
515 daemonize()
515 daemonize()
516
516
517 # Now write the new pid file AFTER our new forked pid is active.
517 # Now write the new pid file AFTER our new forked pid is active.
518 self.write_pid_file()
518 self.write_pid_file()
519 try:
519 try:
520 self.loop.start()
520 self.loop.start()
521 except KeyboardInterrupt:
521 except KeyboardInterrupt:
522 pass
522 pass
523 except zmq.ZMQError as e:
523 except zmq.ZMQError as e:
524 if e.errno == errno.EINTR:
524 if e.errno == errno.EINTR:
525 pass
525 pass
526 else:
526 else:
527 raise
527 raise
528 self.remove_pid_file()
528 self.remove_pid_file()
529
529
530 def start_app_engines(self):
530 def start_app_engines(self):
531 """Start the app for the start subcommand."""
531 """Start the app for the start subcommand."""
532 config = self.master_config
532 config = self.master_config
533 # First see if the cluster is already running
533 # First see if the cluster is already running
534
534
535 # Now log and daemonize
535 # Now log and daemonize
536 self.log.info(
536 self.log.info(
537 'Starting engines with [daemon=%r]' % config.Global.daemonize
537 'Starting engines with [daemon=%r]' % config.Global.daemonize
538 )
538 )
539 # TODO: Get daemonize working on Windows or as a Windows Server.
539 # TODO: Get daemonize working on Windows or as a Windows Server.
540 if config.Global.daemonize:
540 if config.Global.daemonize:
541 if os.name=='posix':
541 if os.name=='posix':
542 from twisted.scripts._twistd_unix import daemonize
542 from twisted.scripts._twistd_unix import daemonize
543 daemonize()
543 daemonize()
544
544
545 # Now write the new pid file AFTER our new forked pid is active.
545 # Now write the new pid file AFTER our new forked pid is active.
546 # self.write_pid_file()
546 # self.write_pid_file()
547 try:
547 try:
548 self.loop.start()
548 self.loop.start()
549 except KeyboardInterrupt:
549 except KeyboardInterrupt:
550 pass
550 pass
551 except zmq.ZMQError as e:
551 except zmq.ZMQError as e:
552 if e.errno == errno.EINTR:
552 if e.errno == errno.EINTR:
553 pass
553 pass
554 else:
554 else:
555 raise
555 raise
556 # self.remove_pid_file()
556 # self.remove_pid_file()
557
557
558 def start_app_stop(self):
558 def start_app_stop(self):
559 """Start the app for the stop subcommand."""
559 """Start the app for the stop subcommand."""
560 config = self.master_config
560 config = self.master_config
561 try:
561 try:
562 pid = self.get_pid_from_file()
562 pid = self.get_pid_from_file()
563 except PIDFileError:
563 except PIDFileError:
564 self.log.critical(
564 self.log.critical(
565 'Problem reading pid file, cluster is probably not running.'
565 'Problem reading pid file, cluster is probably not running.'
566 )
566 )
567 # Here I exit with a unusual exit status that other processes
567 # Here I exit with a unusual exit status that other processes
568 # can watch for to learn how I existed.
568 # can watch for to learn how I existed.
569 self.exit(ALREADY_STOPPED)
569 self.exit(ALREADY_STOPPED)
570 else:
570 else:
571 if os.name=='posix':
571 if os.name=='posix':
572 sig = config.Global.signal
572 sig = config.Global.signal
573 self.log.info(
573 self.log.info(
574 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
574 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
575 )
575 )
576 os.kill(pid, sig)
576 os.kill(pid, sig)
577 elif os.name=='nt':
577 elif os.name=='nt':
578 # As of right now, we don't support daemonize on Windows, so
578 # As of right now, we don't support daemonize on Windows, so
579 # stop will not do anything. Minimally, it should clean up the
579 # stop will not do anything. Minimally, it should clean up the
580 # old .pid files.
580 # old .pid files.
581 self.remove_pid_file()
581 self.remove_pid_file()
582
582
583
583
584 def launch_new_instance():
584 def launch_new_instance():
585 """Create and run the IPython cluster."""
585 """Create and run the IPython cluster."""
586 app = IPClusterApp()
586 app = IPClusterApp()
587 app.start()
587 app.start()
588
588
589
589
590 if __name__ == '__main__':
590 if __name__ == '__main__':
591 launch_new_instance()
591 launch_new_instance()
592
592
@@ -1,427 +1,427 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import os
21 import os
22 import logging
22 import logging
23 import socket
23 import socket
24 import stat
24 import stat
25 import sys
25 import sys
26 import uuid
26 import uuid
27
27
28 import zmq
28 import zmq
29 from zmq.log.handlers import PUBHandler
29 from zmq.log.handlers import PUBHandler
30 from zmq.utils import jsonapi as json
30 from zmq.utils import jsonapi as json
31
31
32 from IPython.config.loader import Config
32 from IPython.config.loader import Config
33 from IPython.zmq.parallel import factory
33 from IPython.zmq.parallel import factory
34 from IPython.zmq.parallel.controller import ControllerFactory
34 from IPython.zmq.parallel.controller import ControllerFactory
35 from IPython.zmq.parallel.clusterdir import (
35 from IPython.zmq.parallel.clusterdir import (
36 ApplicationWithClusterDir,
36 ApplicationWithClusterDir,
37 ClusterDirConfigLoader
37 ClusterDirConfigLoader
38 )
38 )
39 from IPython.zmq.parallel.util import disambiguate_ip_address, split_url
39 from IPython.zmq.parallel.util import disambiguate_ip_address, split_url
40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
41 from IPython.utils.traitlets import Instance, Unicode
41 from IPython.utils.traitlets import Instance, Unicode
42
42
43
43
44
44
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46 # Module level variables
46 # Module level variables
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49
49
50 #: The default config file name for this application
50 #: The default config file name for this application
51 default_config_file_name = u'ipcontrollerz_config.py'
51 default_config_file_name = u'ipcontrollerz_config.py'
52
52
53
53
54 _description = """Start the IPython controller for parallel computing.
54 _description = """Start the IPython controller for parallel computing.
55
55
56 The IPython controller provides a gateway between the IPython engines and
56 The IPython controller provides a gateway between the IPython engines and
57 clients. The controller needs to be started before the engines and can be
57 clients. The controller needs to be started before the engines and can be
58 configured using command line options or using a cluster directory. Cluster
58 configured using command line options or using a cluster directory. Cluster
59 directories contain config, log and security files and are usually located in
59 directories contain config, log and security files and are usually located in
60 your ipython directory and named as "cluster_<profile>". See the --profile
60 your ipython directory and named as "clusterz_<profile>". See the --profile
61 and --cluster-dir options for details.
61 and --cluster-dir options for details.
62 """
62 """
63
63
64 #-----------------------------------------------------------------------------
64 #-----------------------------------------------------------------------------
65 # Default interfaces
65 # Default interfaces
66 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
67
67
68 # The default client interfaces for FCClientServiceFactory.interfaces
68 # The default client interfaces for FCClientServiceFactory.interfaces
69 default_client_interfaces = Config()
69 default_client_interfaces = Config()
70 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
70 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
71
71
72 # Make this a dict we can pass to Config.__init__ for the default
72 # Make this a dict we can pass to Config.__init__ for the default
73 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
73 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
74
74
75
75
76
76
77 # The default engine interfaces for FCEngineServiceFactory.interfaces
77 # The default engine interfaces for FCEngineServiceFactory.interfaces
78 default_engine_interfaces = Config()
78 default_engine_interfaces = Config()
79 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
79 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
80
80
81 # Make this a dict we can pass to Config.__init__ for the default
81 # Make this a dict we can pass to Config.__init__ for the default
82 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
82 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
83
83
84
84
85 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
86 # Service factories
86 # Service factories
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88
88
89 #
89 #
90 # class FCClientServiceFactory(FCServiceFactory):
90 # class FCClientServiceFactory(FCServiceFactory):
91 # """A Foolscap implementation of the client services."""
91 # """A Foolscap implementation of the client services."""
92 #
92 #
93 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
93 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
94 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
94 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
95 # allow_none=False, config=True)
95 # allow_none=False, config=True)
96 #
96 #
97 #
97 #
98 # class FCEngineServiceFactory(FCServiceFactory):
98 # class FCEngineServiceFactory(FCServiceFactory):
99 # """A Foolscap implementation of the engine services."""
99 # """A Foolscap implementation of the engine services."""
100 #
100 #
101 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
101 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
102 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
102 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
103 # allow_none=False, config=True)
103 # allow_none=False, config=True)
104 #
104 #
105
105
106 #-----------------------------------------------------------------------------
106 #-----------------------------------------------------------------------------
107 # Command line options
107 # Command line options
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109
109
110
110
111 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
111 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
112
112
113 def _add_arguments(self):
113 def _add_arguments(self):
114 super(IPControllerAppConfigLoader, self)._add_arguments()
114 super(IPControllerAppConfigLoader, self)._add_arguments()
115 paa = self.parser.add_argument
115 paa = self.parser.add_argument
116
116
117 ## Hub Config:
117 ## Hub Config:
118 paa('--mongodb',
118 paa('--mongodb',
119 dest='HubFactory.db_class', action='store_const',
119 dest='HubFactory.db_class', action='store_const',
120 const='IPython.zmq.parallel.mongodb.MongoDB',
120 const='IPython.zmq.parallel.mongodb.MongoDB',
121 help='Use MongoDB task storage [default: in-memory]')
121 help='Use MongoDB task storage [default: in-memory]')
122 paa('--hb',
122 paa('--hb',
123 type=int, dest='HubFactory.hb', nargs=2,
123 type=int, dest='HubFactory.hb', nargs=2,
124 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
124 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
125 'connections [default: random]',
125 'connections [default: random]',
126 metavar='Hub.hb_ports')
126 metavar='Hub.hb_ports')
127 paa('--ping',
127 paa('--ping',
128 type=int, dest='HubFactory.ping',
128 type=int, dest='HubFactory.ping',
129 help='The frequency at which the Hub pings the engines for heartbeats '
129 help='The frequency at which the Hub pings the engines for heartbeats '
130 ' (in ms) [default: 100]',
130 ' (in ms) [default: 100]',
131 metavar='Hub.ping')
131 metavar='Hub.ping')
132
132
133 # Client config
133 # Client config
134 paa('--client-ip',
134 paa('--client-ip',
135 type=str, dest='HubFactory.client_ip',
135 type=str, dest='HubFactory.client_ip',
136 help='The IP address or hostname the Hub will listen on for '
136 help='The IP address or hostname the Hub will listen on for '
137 'client connections. Both engine-ip and client-ip can be set simultaneously '
137 'client connections. Both engine-ip and client-ip can be set simultaneously '
138 'via --ip [default: loopback]',
138 'via --ip [default: loopback]',
139 metavar='Hub.client_ip')
139 metavar='Hub.client_ip')
140 paa('--client-transport',
140 paa('--client-transport',
141 type=str, dest='HubFactory.client_transport',
141 type=str, dest='HubFactory.client_transport',
142 help='The ZeroMQ transport the Hub will use for '
142 help='The ZeroMQ transport the Hub will use for '
143 'client connections. Both engine-transport and client-transport can be set simultaneously '
143 'client connections. Both engine-transport and client-transport can be set simultaneously '
144 'via --transport [default: tcp]',
144 'via --transport [default: tcp]',
145 metavar='Hub.client_transport')
145 metavar='Hub.client_transport')
146 paa('--query',
146 paa('--query',
147 type=int, dest='HubFactory.query_port',
147 type=int, dest='HubFactory.query_port',
148 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
148 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
149 metavar='Hub.query_port')
149 metavar='Hub.query_port')
150 paa('--notifier',
150 paa('--notifier',
151 type=int, dest='HubFactory.notifier_port',
151 type=int, dest='HubFactory.notifier_port',
152 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
152 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
153 metavar='Hub.notifier_port')
153 metavar='Hub.notifier_port')
154
154
155 # Engine config
155 # Engine config
156 paa('--engine-ip',
156 paa('--engine-ip',
157 type=str, dest='HubFactory.engine_ip',
157 type=str, dest='HubFactory.engine_ip',
158 help='The IP address or hostname the Hub will listen on for '
158 help='The IP address or hostname the Hub will listen on for '
159 'engine connections. This applies to the Hub and its schedulers'
159 'engine connections. This applies to the Hub and its schedulers'
160 'engine-ip and client-ip can be set simultaneously '
160 'engine-ip and client-ip can be set simultaneously '
161 'via --ip [default: loopback]',
161 'via --ip [default: loopback]',
162 metavar='Hub.engine_ip')
162 metavar='Hub.engine_ip')
163 paa('--engine-transport',
163 paa('--engine-transport',
164 type=str, dest='HubFactory.engine_transport',
164 type=str, dest='HubFactory.engine_transport',
165 help='The ZeroMQ transport the Hub will use for '
165 help='The ZeroMQ transport the Hub will use for '
166 'client connections. Both engine-transport and client-transport can be set simultaneously '
166 'client connections. Both engine-transport and client-transport can be set simultaneously '
167 'via --transport [default: tcp]',
167 'via --transport [default: tcp]',
168 metavar='Hub.engine_transport')
168 metavar='Hub.engine_transport')
169
169
170 # Scheduler config
170 # Scheduler config
171 paa('--mux',
171 paa('--mux',
172 type=int, dest='ControllerFactory.mux', nargs=2,
172 type=int, dest='ControllerFactory.mux', nargs=2,
173 help='The (2) ports the MUX scheduler will listen on for client,engine '
173 help='The (2) ports the MUX scheduler will listen on for client,engine '
174 'connections, respectively [default: random]',
174 'connections, respectively [default: random]',
175 metavar='Scheduler.mux_ports')
175 metavar='Scheduler.mux_ports')
176 paa('--task',
176 paa('--task',
177 type=int, dest='ControllerFactory.task', nargs=2,
177 type=int, dest='ControllerFactory.task', nargs=2,
178 help='The (2) ports the Task scheduler will listen on for client,engine '
178 help='The (2) ports the Task scheduler will listen on for client,engine '
179 'connections, respectively [default: random]',
179 'connections, respectively [default: random]',
180 metavar='Scheduler.task_ports')
180 metavar='Scheduler.task_ports')
181 paa('--control',
181 paa('--control',
182 type=int, dest='ControllerFactory.control', nargs=2,
182 type=int, dest='ControllerFactory.control', nargs=2,
183 help='The (2) ports the Control scheduler will listen on for client,engine '
183 help='The (2) ports the Control scheduler will listen on for client,engine '
184 'connections, respectively [default: random]',
184 'connections, respectively [default: random]',
185 metavar='Scheduler.control_ports')
185 metavar='Scheduler.control_ports')
186 paa('--iopub',
186 paa('--iopub',
187 type=int, dest='ControllerFactory.iopub', nargs=2,
187 type=int, dest='ControllerFactory.iopub', nargs=2,
188 help='The (2) ports the IOPub scheduler will listen on for client,engine '
188 help='The (2) ports the IOPub scheduler will listen on for client,engine '
189 'connections, respectively [default: random]',
189 'connections, respectively [default: random]',
190 metavar='Scheduler.iopub_ports')
190 metavar='Scheduler.iopub_ports')
191
191
192 paa('--scheme',
192 paa('--scheme',
193 type=str, dest='HubFactory.scheme',
193 type=str, dest='HubFactory.scheme',
194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
195 help='select the task scheduler scheme [default: Python LRU]',
195 help='select the task scheduler scheme [default: Python LRU]',
196 metavar='Scheduler.scheme')
196 metavar='Scheduler.scheme')
197 paa('--usethreads',
197 paa('--usethreads',
198 dest='ControllerFactory.usethreads', action="store_true",
198 dest='ControllerFactory.usethreads', action="store_true",
199 help='Use threads instead of processes for the schedulers',
199 help='Use threads instead of processes for the schedulers',
200 )
200 )
201 paa('--hwm',
201 paa('--hwm',
202 dest='ControllerFactory.hwm', type=int,
202 dest='ControllerFactory.hwm', type=int,
203 help='specify the High Water Mark (HWM) for the downstream '
203 help='specify the High Water Mark (HWM) for the downstream '
204 'socket in the pure ZMQ scheduler. This is the maximum number '
204 'socket in the pure ZMQ scheduler. This is the maximum number '
205 'of allowed outstanding tasks on each engine.',
205 'of allowed outstanding tasks on each engine.',
206 )
206 )
207
207
208 ## Global config
208 ## Global config
209 paa('--log-to-file',
209 paa('--log-to-file',
210 action='store_true', dest='Global.log_to_file',
210 action='store_true', dest='Global.log_to_file',
211 help='Log to a file in the log directory (default is stdout)')
211 help='Log to a file in the log directory (default is stdout)')
212 paa('--log-url',
212 paa('--log-url',
213 type=str, dest='Global.log_url',
213 type=str, dest='Global.log_url',
214 help='Broadcast logs to an iploggerz process [default: disabled]')
214 help='Broadcast logs to an iploggerz process [default: disabled]')
215 paa('-r','--reuse-files',
215 paa('-r','--reuse-files',
216 action='store_true', dest='Global.reuse_files',
216 action='store_true', dest='Global.reuse_files',
217 help='Try to reuse existing json connection files.')
217 help='Try to reuse existing json connection files.')
218 paa('--no-secure',
218 paa('--no-secure',
219 action='store_false', dest='Global.secure',
219 action='store_false', dest='Global.secure',
220 help='Turn off execution keys (default).')
220 help='Turn off execution keys (default).')
221 paa('--secure',
221 paa('--secure',
222 action='store_true', dest='Global.secure',
222 action='store_true', dest='Global.secure',
223 help='Turn on execution keys.')
223 help='Turn on execution keys.')
224 paa('--execkey',
224 paa('--execkey',
225 type=str, dest='Global.exec_key',
225 type=str, dest='Global.exec_key',
226 help='path to a file containing an execution key.',
226 help='path to a file containing an execution key.',
227 metavar='keyfile')
227 metavar='keyfile')
228 paa('--ssh',
228 paa('--ssh',
229 type=str, dest='Global.sshserver',
229 type=str, dest='Global.sshserver',
230 help='ssh url for clients to use when connecting to the Controller '
230 help='ssh url for clients to use when connecting to the Controller '
231 'processes. It should be of the form: [user@]server[:port]. The '
231 'processes. It should be of the form: [user@]server[:port]. The '
232 'Controller\'s listening addresses must be accessible from the ssh server',
232 'Controller\'s listening addresses must be accessible from the ssh server',
233 metavar='Global.sshserver')
233 metavar='Global.sshserver')
234 paa('--location',
234 paa('--location',
235 type=str, dest='Global.location',
235 type=str, dest='Global.location',
236 help="The external IP or domain name of this machine, used for disambiguating "
236 help="The external IP or domain name of this machine, used for disambiguating "
237 "engine and client connections.",
237 "engine and client connections.",
238 metavar='Global.location')
238 metavar='Global.location')
239 factory.add_session_arguments(self.parser)
239 factory.add_session_arguments(self.parser)
240 factory.add_registration_arguments(self.parser)
240 factory.add_registration_arguments(self.parser)
241
241
242
242
243 #-----------------------------------------------------------------------------
243 #-----------------------------------------------------------------------------
244 # The main application
244 # The main application
245 #-----------------------------------------------------------------------------
245 #-----------------------------------------------------------------------------
246
246
247
247
248 class IPControllerApp(ApplicationWithClusterDir):
248 class IPControllerApp(ApplicationWithClusterDir):
249
249
250 name = u'ipcontrollerz'
250 name = u'ipcontrollerz'
251 description = _description
251 description = _description
252 command_line_loader = IPControllerAppConfigLoader
252 command_line_loader = IPControllerAppConfigLoader
253 default_config_file_name = default_config_file_name
253 default_config_file_name = default_config_file_name
254 auto_create_cluster_dir = True
254 auto_create_cluster_dir = True
255
255
256
256
257 def create_default_config(self):
257 def create_default_config(self):
258 super(IPControllerApp, self).create_default_config()
258 super(IPControllerApp, self).create_default_config()
259 # Don't set defaults for Global.secure or Global.reuse_furls
259 # Don't set defaults for Global.secure or Global.reuse_furls
260 # as those are set in a component.
260 # as those are set in a component.
261 self.default_config.Global.import_statements = []
261 self.default_config.Global.import_statements = []
262 self.default_config.Global.clean_logs = True
262 self.default_config.Global.clean_logs = True
263 self.default_config.Global.secure = True
263 self.default_config.Global.secure = True
264 self.default_config.Global.reuse_files = False
264 self.default_config.Global.reuse_files = False
265 self.default_config.Global.exec_key = "exec_key.key"
265 self.default_config.Global.exec_key = "exec_key.key"
266 self.default_config.Global.sshserver = None
266 self.default_config.Global.sshserver = None
267 self.default_config.Global.location = None
267 self.default_config.Global.location = None
268
268
269 def pre_construct(self):
269 def pre_construct(self):
270 super(IPControllerApp, self).pre_construct()
270 super(IPControllerApp, self).pre_construct()
271 c = self.master_config
271 c = self.master_config
272 # The defaults for these are set in FCClientServiceFactory and
272 # The defaults for these are set in FCClientServiceFactory and
273 # FCEngineServiceFactory, so we only set them here if the global
273 # FCEngineServiceFactory, so we only set them here if the global
274 # options have be set to override the class level defaults.
274 # options have be set to override the class level defaults.
275
275
276 # if hasattr(c.Global, 'reuse_furls'):
276 # if hasattr(c.Global, 'reuse_furls'):
277 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
277 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
278 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
278 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
279 # del c.Global.reuse_furls
279 # del c.Global.reuse_furls
280 # if hasattr(c.Global, 'secure'):
280 # if hasattr(c.Global, 'secure'):
281 # c.FCClientServiceFactory.secure = c.Global.secure
281 # c.FCClientServiceFactory.secure = c.Global.secure
282 # c.FCEngineServiceFactory.secure = c.Global.secure
282 # c.FCEngineServiceFactory.secure = c.Global.secure
283 # del c.Global.secure
283 # del c.Global.secure
284
284
285 def save_connection_dict(self, fname, cdict):
285 def save_connection_dict(self, fname, cdict):
286 """save a connection dict to json file."""
286 """save a connection dict to json file."""
287 c = self.master_config
287 c = self.master_config
288 url = cdict['url']
288 url = cdict['url']
289 location = cdict['location']
289 location = cdict['location']
290 if not location:
290 if not location:
291 try:
291 try:
292 proto,ip,port = split_url(url)
292 proto,ip,port = split_url(url)
293 except AssertionError:
293 except AssertionError:
294 pass
294 pass
295 else:
295 else:
296 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
296 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
297 cdict['location'] = location
297 cdict['location'] = location
298 fname = os.path.join(c.Global.security_dir, fname)
298 fname = os.path.join(c.Global.security_dir, fname)
299 with open(fname, 'w') as f:
299 with open(fname, 'w') as f:
300 f.write(json.dumps(cdict, indent=2))
300 f.write(json.dumps(cdict, indent=2))
301 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
301 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
302
302
303 def load_config_from_json(self):
303 def load_config_from_json(self):
304 """load config from existing json connector files."""
304 """load config from existing json connector files."""
305 c = self.master_config
305 c = self.master_config
306 # load from engine config
306 # load from engine config
307 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
307 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
308 cfg = json.loads(f.read())
308 cfg = json.loads(f.read())
309 key = c.SessionFactory.exec_key = cfg['exec_key']
309 key = c.SessionFactory.exec_key = cfg['exec_key']
310 xport,addr = cfg['url'].split('://')
310 xport,addr = cfg['url'].split('://')
311 c.HubFactory.engine_transport = xport
311 c.HubFactory.engine_transport = xport
312 ip,ports = addr.split(':')
312 ip,ports = addr.split(':')
313 c.HubFactory.engine_ip = ip
313 c.HubFactory.engine_ip = ip
314 c.HubFactory.regport = int(ports)
314 c.HubFactory.regport = int(ports)
315 c.Global.location = cfg['location']
315 c.Global.location = cfg['location']
316
316
317 # load client config
317 # load client config
318 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
318 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
319 cfg = json.loads(f.read())
319 cfg = json.loads(f.read())
320 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
320 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
321 xport,addr = cfg['url'].split('://')
321 xport,addr = cfg['url'].split('://')
322 c.HubFactory.client_transport = xport
322 c.HubFactory.client_transport = xport
323 ip,ports = addr.split(':')
323 ip,ports = addr.split(':')
324 c.HubFactory.client_ip = ip
324 c.HubFactory.client_ip = ip
325 c.Global.sshserver = cfg['ssh']
325 c.Global.sshserver = cfg['ssh']
326 assert int(ports) == c.HubFactory.regport, "regport mismatch"
326 assert int(ports) == c.HubFactory.regport, "regport mismatch"
327
327
328 def construct(self):
328 def construct(self):
329 # This is the working dir by now.
329 # This is the working dir by now.
330 sys.path.insert(0, '')
330 sys.path.insert(0, '')
331 c = self.master_config
331 c = self.master_config
332
332
333 self.import_statements()
333 self.import_statements()
334 reusing = c.Global.reuse_files
334 reusing = c.Global.reuse_files
335 if reusing:
335 if reusing:
336 try:
336 try:
337 self.load_config_from_json()
337 self.load_config_from_json()
338 except (AssertionError,IOError):
338 except (AssertionError,IOError):
339 reusing=False
339 reusing=False
340 # check again, because reusing may have failed:
340 # check again, because reusing may have failed:
341 if reusing:
341 if reusing:
342 pass
342 pass
343 elif c.Global.secure:
343 elif c.Global.secure:
344 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
344 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
345 key = str(uuid.uuid4())
345 key = str(uuid.uuid4())
346 with open(keyfile, 'w') as f:
346 with open(keyfile, 'w') as f:
347 f.write(key)
347 f.write(key)
348 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
348 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
349 c.SessionFactory.exec_key = key
349 c.SessionFactory.exec_key = key
350 else:
350 else:
351 c.SessionFactory.exec_key = ''
351 c.SessionFactory.exec_key = ''
352 key = None
352 key = None
353
353
354 try:
354 try:
355 self.factory = ControllerFactory(config=c, logname=self.log.name)
355 self.factory = ControllerFactory(config=c, logname=self.log.name)
356 self.start_logging()
356 self.start_logging()
357 self.factory.construct()
357 self.factory.construct()
358 except:
358 except:
359 self.log.error("Couldn't construct the Controller", exc_info=True)
359 self.log.error("Couldn't construct the Controller", exc_info=True)
360 self.exit(1)
360 self.exit(1)
361
361
362 if not reusing:
362 if not reusing:
363 # save to new json config files
363 # save to new json config files
364 f = self.factory
364 f = self.factory
365 cdict = {'exec_key' : key,
365 cdict = {'exec_key' : key,
366 'ssh' : c.Global.sshserver,
366 'ssh' : c.Global.sshserver,
367 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
367 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
368 'location' : c.Global.location
368 'location' : c.Global.location
369 }
369 }
370 self.save_connection_dict('ipcontroller-client.json', cdict)
370 self.save_connection_dict('ipcontroller-client.json', cdict)
371 edict = cdict
371 edict = cdict
372 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
372 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
373 self.save_connection_dict('ipcontroller-engine.json', edict)
373 self.save_connection_dict('ipcontroller-engine.json', edict)
374
374
375
375
376 def save_urls(self):
376 def save_urls(self):
377 """save the registration urls to files."""
377 """save the registration urls to files."""
378 c = self.master_config
378 c = self.master_config
379
379
380 sec_dir = c.Global.security_dir
380 sec_dir = c.Global.security_dir
381 cf = self.factory
381 cf = self.factory
382
382
383 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
383 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
384 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
384 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
385
385
386 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
386 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
387 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
387 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
388
388
389
389
390 def import_statements(self):
390 def import_statements(self):
391 statements = self.master_config.Global.import_statements
391 statements = self.master_config.Global.import_statements
392 for s in statements:
392 for s in statements:
393 try:
393 try:
394 self.log.msg("Executing statement: '%s'" % s)
394 self.log.msg("Executing statement: '%s'" % s)
395 exec s in globals(), locals()
395 exec s in globals(), locals()
396 except:
396 except:
397 self.log.msg("Error running statement: %s" % s)
397 self.log.msg("Error running statement: %s" % s)
398
398
399 def start_logging(self):
399 def start_logging(self):
400 super(IPControllerApp, self).start_logging()
400 super(IPControllerApp, self).start_logging()
401 if self.master_config.Global.log_url:
401 if self.master_config.Global.log_url:
402 context = self.factory.context
402 context = self.factory.context
403 lsock = context.socket(zmq.PUB)
403 lsock = context.socket(zmq.PUB)
404 lsock.connect(self.master_config.Global.log_url)
404 lsock.connect(self.master_config.Global.log_url)
405 handler = PUBHandler(lsock)
405 handler = PUBHandler(lsock)
406 handler.root_topic = 'controller'
406 handler.root_topic = 'controller'
407 handler.setLevel(self.log_level)
407 handler.setLevel(self.log_level)
408 self.log.addHandler(handler)
408 self.log.addHandler(handler)
409 #
409 #
410 def start_app(self):
410 def start_app(self):
411 # Start the subprocesses:
411 # Start the subprocesses:
412 self.factory.start()
412 self.factory.start()
413 self.write_pid_file(overwrite=True)
413 self.write_pid_file(overwrite=True)
414 try:
414 try:
415 self.factory.loop.start()
415 self.factory.loop.start()
416 except KeyboardInterrupt:
416 except KeyboardInterrupt:
417 self.log.critical("Interrupted, Exiting...\n")
417 self.log.critical("Interrupted, Exiting...\n")
418
418
419
419
420 def launch_new_instance():
420 def launch_new_instance():
421 """Create and run the IPython controller"""
421 """Create and run the IPython controller"""
422 app = IPControllerApp()
422 app = IPControllerApp()
423 app.start()
423 app.start()
424
424
425
425
426 if __name__ == '__main__':
426 if __name__ == '__main__':
427 launch_new_instance()
427 launch_new_instance()
@@ -1,294 +1,294 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import json
18 import json
19 import os
19 import os
20 import sys
20 import sys
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from IPython.zmq.parallel.clusterdir import (
25 from IPython.zmq.parallel.clusterdir import (
26 ApplicationWithClusterDir,
26 ApplicationWithClusterDir,
27 ClusterDirConfigLoader
27 ClusterDirConfigLoader
28 )
28 )
29 from IPython.zmq.log import EnginePUBHandler
29 from IPython.zmq.log import EnginePUBHandler
30
30
31 from IPython.zmq.parallel import factory
31 from IPython.zmq.parallel import factory
32 from IPython.zmq.parallel.engine import EngineFactory
32 from IPython.zmq.parallel.engine import EngineFactory
33 from IPython.zmq.parallel.streamkernel import Kernel
33 from IPython.zmq.parallel.streamkernel import Kernel
34 from IPython.zmq.parallel.util import disambiguate_url
34 from IPython.zmq.parallel.util import disambiguate_url
35 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
36
36
37
37
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39 # Module level variables
39 # Module level variables
40 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
41
41
42 #: The default config file name for this application
42 #: The default config file name for this application
43 default_config_file_name = u'ipenginez_config.py'
43 default_config_file_name = u'ipenginez_config.py'
44
44
45
45
46 mpi4py_init = """from mpi4py import MPI as mpi
46 mpi4py_init = """from mpi4py import MPI as mpi
47 mpi.size = mpi.COMM_WORLD.Get_size()
47 mpi.size = mpi.COMM_WORLD.Get_size()
48 mpi.rank = mpi.COMM_WORLD.Get_rank()
48 mpi.rank = mpi.COMM_WORLD.Get_rank()
49 """
49 """
50
50
51
51
52 pytrilinos_init = """from PyTrilinos import Epetra
52 pytrilinos_init = """from PyTrilinos import Epetra
53 class SimpleStruct:
53 class SimpleStruct:
54 pass
54 pass
55 mpi = SimpleStruct()
55 mpi = SimpleStruct()
56 mpi.rank = 0
56 mpi.rank = 0
57 mpi.size = 0
57 mpi.size = 0
58 """
58 """
59
59
60
60
61 _description = """Start an IPython engine for parallel computing.\n\n
61 _description = """Start an IPython engine for parallel computing.\n\n
62
62
63 IPython engines run in parallel and perform computations on behalf of a client
63 IPython engines run in parallel and perform computations on behalf of a client
64 and controller. A controller needs to be started before the engines. The
64 and controller. A controller needs to be started before the engines. The
65 engine can be configured using command line options or using a cluster
65 engine can be configured using command line options or using a cluster
66 directory. Cluster directories contain config, log and security files and are
66 directory. Cluster directories contain config, log and security files and are
67 usually located in your ipython directory and named as "cluster_<profile>".
67 usually located in your ipython directory and named as "clusterz_<profile>".
68 See the --profile and --cluster-dir options for details.
68 See the --profile and --cluster-dir options for details.
69 """
69 """
70
70
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72 # Command line options
72 # Command line options
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74
74
75
75
76 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
76 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
77
77
78 def _add_arguments(self):
78 def _add_arguments(self):
79 super(IPEngineAppConfigLoader, self)._add_arguments()
79 super(IPEngineAppConfigLoader, self)._add_arguments()
80 paa = self.parser.add_argument
80 paa = self.parser.add_argument
81 # Controller config
81 # Controller config
82 paa('--file', '-f',
82 paa('--file', '-f',
83 type=unicode, dest='Global.url_file',
83 type=unicode, dest='Global.url_file',
84 help='The full location of the file containing the connection information fo '
84 help='The full location of the file containing the connection information fo '
85 'controller. If this is not given, the file must be in the '
85 'controller. If this is not given, the file must be in the '
86 'security directory of the cluster directory. This location is '
86 'security directory of the cluster directory. This location is '
87 'resolved using the --profile and --app-dir options.',
87 'resolved using the --profile and --app-dir options.',
88 metavar='Global.url_file')
88 metavar='Global.url_file')
89 # MPI
89 # MPI
90 paa('--mpi',
90 paa('--mpi',
91 type=str, dest='MPI.use',
91 type=str, dest='MPI.use',
92 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
92 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
93 metavar='MPI.use')
93 metavar='MPI.use')
94 # Global config
94 # Global config
95 paa('--log-to-file',
95 paa('--log-to-file',
96 action='store_true', dest='Global.log_to_file',
96 action='store_true', dest='Global.log_to_file',
97 help='Log to a file in the log directory (default is stdout)')
97 help='Log to a file in the log directory (default is stdout)')
98 paa('--log-url',
98 paa('--log-url',
99 dest='Global.log_url',
99 dest='Global.log_url',
100 help="url of ZMQ logger, as started with iploggerz")
100 help="url of ZMQ logger, as started with iploggerz")
101 # paa('--execkey',
101 # paa('--execkey',
102 # type=str, dest='Global.exec_key',
102 # type=str, dest='Global.exec_key',
103 # help='path to a file containing an execution key.',
103 # help='path to a file containing an execution key.',
104 # metavar='keyfile')
104 # metavar='keyfile')
105 # paa('--no-secure',
105 # paa('--no-secure',
106 # action='store_false', dest='Global.secure',
106 # action='store_false', dest='Global.secure',
107 # help='Turn off execution keys.')
107 # help='Turn off execution keys.')
108 # paa('--secure',
108 # paa('--secure',
109 # action='store_true', dest='Global.secure',
109 # action='store_true', dest='Global.secure',
110 # help='Turn on execution keys (default).')
110 # help='Turn on execution keys (default).')
111 # init command
111 # init command
112 paa('-c',
112 paa('-c',
113 type=str, dest='Global.extra_exec_lines',
113 type=str, dest='Global.extra_exec_lines',
114 help='specify a command to be run at startup')
114 help='specify a command to be run at startup')
115
115
116 factory.add_session_arguments(self.parser)
116 factory.add_session_arguments(self.parser)
117 factory.add_registration_arguments(self.parser)
117 factory.add_registration_arguments(self.parser)
118
118
119
119
120 #-----------------------------------------------------------------------------
120 #-----------------------------------------------------------------------------
121 # Main application
121 # Main application
122 #-----------------------------------------------------------------------------
122 #-----------------------------------------------------------------------------
123
123
124
124
125 class IPEngineApp(ApplicationWithClusterDir):
125 class IPEngineApp(ApplicationWithClusterDir):
126
126
127 name = u'ipenginez'
127 name = u'ipenginez'
128 description = _description
128 description = _description
129 command_line_loader = IPEngineAppConfigLoader
129 command_line_loader = IPEngineAppConfigLoader
130 default_config_file_name = default_config_file_name
130 default_config_file_name = default_config_file_name
131 auto_create_cluster_dir = True
131 auto_create_cluster_dir = True
132
132
133 def create_default_config(self):
133 def create_default_config(self):
134 super(IPEngineApp, self).create_default_config()
134 super(IPEngineApp, self).create_default_config()
135
135
136 # The engine should not clean logs as we don't want to remove the
136 # The engine should not clean logs as we don't want to remove the
137 # active log files of other running engines.
137 # active log files of other running engines.
138 self.default_config.Global.clean_logs = False
138 self.default_config.Global.clean_logs = False
139 self.default_config.Global.secure = True
139 self.default_config.Global.secure = True
140
140
141 # Global config attributes
141 # Global config attributes
142 self.default_config.Global.exec_lines = []
142 self.default_config.Global.exec_lines = []
143 self.default_config.Global.extra_exec_lines = ''
143 self.default_config.Global.extra_exec_lines = ''
144
144
145 # Configuration related to the controller
145 # Configuration related to the controller
146 # This must match the filename (path not included) that the controller
146 # This must match the filename (path not included) that the controller
147 # used for the FURL file.
147 # used for the FURL file.
148 self.default_config.Global.url_file = u''
148 self.default_config.Global.url_file = u''
149 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
149 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
150 # If given, this is the actual location of the controller's FURL file.
150 # If given, this is the actual location of the controller's FURL file.
151 # If not, this is computed using the profile, app_dir and furl_file_name
151 # If not, this is computed using the profile, app_dir and furl_file_name
152 # self.default_config.Global.key_file_name = u'exec_key.key'
152 # self.default_config.Global.key_file_name = u'exec_key.key'
153 # self.default_config.Global.key_file = u''
153 # self.default_config.Global.key_file = u''
154
154
155 # MPI related config attributes
155 # MPI related config attributes
156 self.default_config.MPI.use = ''
156 self.default_config.MPI.use = ''
157 self.default_config.MPI.mpi4py = mpi4py_init
157 self.default_config.MPI.mpi4py = mpi4py_init
158 self.default_config.MPI.pytrilinos = pytrilinos_init
158 self.default_config.MPI.pytrilinos = pytrilinos_init
159
159
160 def post_load_command_line_config(self):
160 def post_load_command_line_config(self):
161 pass
161 pass
162
162
163 def pre_construct(self):
163 def pre_construct(self):
164 super(IPEngineApp, self).pre_construct()
164 super(IPEngineApp, self).pre_construct()
165 # self.find_cont_url_file()
165 # self.find_cont_url_file()
166 self.find_url_file()
166 self.find_url_file()
167 if self.master_config.Global.extra_exec_lines:
167 if self.master_config.Global.extra_exec_lines:
168 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
168 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
169
169
170 # def find_key_file(self):
170 # def find_key_file(self):
171 # """Set the key file.
171 # """Set the key file.
172 #
172 #
173 # Here we don't try to actually see if it exists for is valid as that
173 # Here we don't try to actually see if it exists for is valid as that
174 # is hadled by the connection logic.
174 # is hadled by the connection logic.
175 # """
175 # """
176 # config = self.master_config
176 # config = self.master_config
177 # # Find the actual controller key file
177 # # Find the actual controller key file
178 # if not config.Global.key_file:
178 # if not config.Global.key_file:
179 # try_this = os.path.join(
179 # try_this = os.path.join(
180 # config.Global.cluster_dir,
180 # config.Global.cluster_dir,
181 # config.Global.security_dir,
181 # config.Global.security_dir,
182 # config.Global.key_file_name
182 # config.Global.key_file_name
183 # )
183 # )
184 # config.Global.key_file = try_this
184 # config.Global.key_file = try_this
185
185
186 def find_url_file(self):
186 def find_url_file(self):
187 """Set the key file.
187 """Set the key file.
188
188
189 Here we don't try to actually see if it exists for is valid as that
189 Here we don't try to actually see if it exists for is valid as that
190 is hadled by the connection logic.
190 is hadled by the connection logic.
191 """
191 """
192 config = self.master_config
192 config = self.master_config
193 # Find the actual controller key file
193 # Find the actual controller key file
194 if not config.Global.url_file:
194 if not config.Global.url_file:
195 try_this = os.path.join(
195 try_this = os.path.join(
196 config.Global.cluster_dir,
196 config.Global.cluster_dir,
197 config.Global.security_dir,
197 config.Global.security_dir,
198 config.Global.url_file_name
198 config.Global.url_file_name
199 )
199 )
200 config.Global.url_file = try_this
200 config.Global.url_file = try_this
201
201
202 def construct(self):
202 def construct(self):
203 # This is the working dir by now.
203 # This is the working dir by now.
204 sys.path.insert(0, '')
204 sys.path.insert(0, '')
205 config = self.master_config
205 config = self.master_config
206 # if os.path.exists(config.Global.key_file) and config.Global.secure:
206 # if os.path.exists(config.Global.key_file) and config.Global.secure:
207 # config.SessionFactory.exec_key = config.Global.key_file
207 # config.SessionFactory.exec_key = config.Global.key_file
208 if os.path.exists(config.Global.url_file):
208 if os.path.exists(config.Global.url_file):
209 with open(config.Global.url_file) as f:
209 with open(config.Global.url_file) as f:
210 d = json.loads(f.read())
210 d = json.loads(f.read())
211 for k,v in d.iteritems():
211 for k,v in d.iteritems():
212 if isinstance(v, unicode):
212 if isinstance(v, unicode):
213 d[k] = v.encode()
213 d[k] = v.encode()
214 if d['exec_key']:
214 if d['exec_key']:
215 config.SessionFactory.exec_key = d['exec_key']
215 config.SessionFactory.exec_key = d['exec_key']
216 d['url'] = disambiguate_url(d['url'], d['location'])
216 d['url'] = disambiguate_url(d['url'], d['location'])
217 config.RegistrationFactory.url=d['url']
217 config.RegistrationFactory.url=d['url']
218 config.EngineFactory.location = d['location']
218 config.EngineFactory.location = d['location']
219
219
220
220
221
221
222 config.Kernel.exec_lines = config.Global.exec_lines
222 config.Kernel.exec_lines = config.Global.exec_lines
223
223
224 self.start_mpi()
224 self.start_mpi()
225
225
226 # Create the underlying shell class and EngineService
226 # Create the underlying shell class and EngineService
227 # shell_class = import_item(self.master_config.Global.shell_class)
227 # shell_class = import_item(self.master_config.Global.shell_class)
228 try:
228 try:
229 self.engine = EngineFactory(config=config, logname=self.log.name)
229 self.engine = EngineFactory(config=config, logname=self.log.name)
230 except:
230 except:
231 self.log.error("Couldn't start the Engine", exc_info=True)
231 self.log.error("Couldn't start the Engine", exc_info=True)
232 self.exit(1)
232 self.exit(1)
233
233
234 self.start_logging()
234 self.start_logging()
235
235
236 # Create the service hierarchy
236 # Create the service hierarchy
237 # self.main_service = service.MultiService()
237 # self.main_service = service.MultiService()
238 # self.engine_service.setServiceParent(self.main_service)
238 # self.engine_service.setServiceParent(self.main_service)
239 # self.tub_service = Tub()
239 # self.tub_service = Tub()
240 # self.tub_service.setServiceParent(self.main_service)
240 # self.tub_service.setServiceParent(self.main_service)
241 # # This needs to be called before the connection is initiated
241 # # This needs to be called before the connection is initiated
242 # self.main_service.startService()
242 # self.main_service.startService()
243
243
244 # This initiates the connection to the controller and calls
244 # This initiates the connection to the controller and calls
245 # register_engine to tell the controller we are ready to do work
245 # register_engine to tell the controller we are ready to do work
246 # self.engine_connector = EngineConnector(self.tub_service)
246 # self.engine_connector = EngineConnector(self.tub_service)
247
247
248 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
248 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
249
249
250 # reactor.callWhenRunning(self.call_connect)
250 # reactor.callWhenRunning(self.call_connect)
251
251
252
252
253 def start_logging(self):
253 def start_logging(self):
254 super(IPEngineApp, self).start_logging()
254 super(IPEngineApp, self).start_logging()
255 if self.master_config.Global.log_url:
255 if self.master_config.Global.log_url:
256 context = self.engine.context
256 context = self.engine.context
257 lsock = context.socket(zmq.PUB)
257 lsock = context.socket(zmq.PUB)
258 lsock.connect(self.master_config.Global.log_url)
258 lsock.connect(self.master_config.Global.log_url)
259 handler = EnginePUBHandler(self.engine, lsock)
259 handler = EnginePUBHandler(self.engine, lsock)
260 handler.setLevel(self.log_level)
260 handler.setLevel(self.log_level)
261 self.log.addHandler(handler)
261 self.log.addHandler(handler)
262
262
263 def start_mpi(self):
263 def start_mpi(self):
264 global mpi
264 global mpi
265 mpikey = self.master_config.MPI.use
265 mpikey = self.master_config.MPI.use
266 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
266 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
267 if mpi_import_statement is not None:
267 if mpi_import_statement is not None:
268 try:
268 try:
269 self.log.info("Initializing MPI:")
269 self.log.info("Initializing MPI:")
270 self.log.info(mpi_import_statement)
270 self.log.info(mpi_import_statement)
271 exec mpi_import_statement in globals()
271 exec mpi_import_statement in globals()
272 except:
272 except:
273 mpi = None
273 mpi = None
274 else:
274 else:
275 mpi = None
275 mpi = None
276
276
277
277
278 def start_app(self):
278 def start_app(self):
279 self.engine.start()
279 self.engine.start()
280 try:
280 try:
281 self.engine.loop.start()
281 self.engine.loop.start()
282 except KeyboardInterrupt:
282 except KeyboardInterrupt:
283 self.log.critical("Engine Interrupted, shutting down...\n")
283 self.log.critical("Engine Interrupted, shutting down...\n")
284
284
285
285
286 def launch_new_instance():
286 def launch_new_instance():
287 """Create and run the IPython controller"""
287 """Create and run the IPython controller"""
288 app = IPEngineApp()
288 app = IPEngineApp()
289 app.start()
289 app.start()
290
290
291
291
292 if __name__ == '__main__':
292 if __name__ == '__main__':
293 launch_new_instance()
293 launch_new_instance()
294
294
@@ -1,132 +1,132 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A simple IPython logger application
4 A simple IPython logger application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 import zmq
21 import zmq
22
22
23 from IPython.zmq.parallel.clusterdir import (
23 from IPython.zmq.parallel.clusterdir import (
24 ApplicationWithClusterDir,
24 ApplicationWithClusterDir,
25 ClusterDirConfigLoader
25 ClusterDirConfigLoader
26 )
26 )
27 from .logwatcher import LogWatcher
27 from .logwatcher import LogWatcher
28
28
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 # Module level variables
30 # Module level variables
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32
32
33 #: The default config file name for this application
33 #: The default config file name for this application
34 default_config_file_name = u'iplogger_config.py'
34 default_config_file_name = u'iplogger_config.py'
35
35
36 _description = """Start an IPython logger for parallel computing.\n\n
36 _description = """Start an IPython logger for parallel computing.\n\n
37
37
38 IPython controllers and engines (and your own processes) can broadcast log messages
38 IPython controllers and engines (and your own processes) can broadcast log messages
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 logger can be configured using command line options or using a cluster
40 logger can be configured using command line options or using a cluster
41 directory. Cluster directories contain config, log and security files and are
41 directory. Cluster directories contain config, log and security files and are
42 usually located in your ipython directory and named as "cluster_<profile>".
42 usually located in your ipython directory and named as "clusterz_<profile>".
43 See the --profile and --cluster-dir options for details.
43 See the --profile and --cluster-dir options for details.
44 """
44 """
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Command line options
47 # Command line options
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50
50
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52
52
53 def _add_arguments(self):
53 def _add_arguments(self):
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 paa = self.parser.add_argument
55 paa = self.parser.add_argument
56 # Controller config
56 # Controller config
57 paa('--url',
57 paa('--url',
58 type=str, dest='LogWatcher.url',
58 type=str, dest='LogWatcher.url',
59 help='The url the LogWatcher will listen on',
59 help='The url the LogWatcher will listen on',
60 )
60 )
61 # MPI
61 # MPI
62 paa('--topics',
62 paa('--topics',
63 type=str, dest='LogWatcher.topics', nargs='+',
63 type=str, dest='LogWatcher.topics', nargs='+',
64 help='What topics to subscribe to',
64 help='What topics to subscribe to',
65 metavar='topics')
65 metavar='topics')
66 # Global config
66 # Global config
67 paa('--log-to-file',
67 paa('--log-to-file',
68 action='store_true', dest='Global.log_to_file',
68 action='store_true', dest='Global.log_to_file',
69 help='Log to a file in the log directory (default is stdout)')
69 help='Log to a file in the log directory (default is stdout)')
70
70
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Main application
73 # Main application
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 class IPLoggerApp(ApplicationWithClusterDir):
77 class IPLoggerApp(ApplicationWithClusterDir):
78
78
79 name = u'iploggerz'
79 name = u'iploggerz'
80 description = _description
80 description = _description
81 command_line_loader = IPLoggerAppConfigLoader
81 command_line_loader = IPLoggerAppConfigLoader
82 default_config_file_name = default_config_file_name
82 default_config_file_name = default_config_file_name
83 auto_create_cluster_dir = True
83 auto_create_cluster_dir = True
84
84
85 def create_default_config(self):
85 def create_default_config(self):
86 super(IPLoggerApp, self).create_default_config()
86 super(IPLoggerApp, self).create_default_config()
87
87
88 # The engine should not clean logs as we don't want to remove the
88 # The engine should not clean logs as we don't want to remove the
89 # active log files of other running engines.
89 # active log files of other running engines.
90 self.default_config.Global.clean_logs = False
90 self.default_config.Global.clean_logs = False
91
91
92 # If given, this is the actual location of the logger's URL file.
92 # If given, this is the actual location of the logger's URL file.
93 # If not, this is computed using the profile, app_dir and furl_file_name
93 # If not, this is computed using the profile, app_dir and furl_file_name
94 self.default_config.Global.url_file_name = u'iplogger.url'
94 self.default_config.Global.url_file_name = u'iplogger.url'
95 self.default_config.Global.url_file = u''
95 self.default_config.Global.url_file = u''
96
96
97 def post_load_command_line_config(self):
97 def post_load_command_line_config(self):
98 pass
98 pass
99
99
100 def pre_construct(self):
100 def pre_construct(self):
101 super(IPLoggerApp, self).pre_construct()
101 super(IPLoggerApp, self).pre_construct()
102
102
103 def construct(self):
103 def construct(self):
104 # This is the working dir by now.
104 # This is the working dir by now.
105 sys.path.insert(0, '')
105 sys.path.insert(0, '')
106
106
107 self.start_logging()
107 self.start_logging()
108
108
109 try:
109 try:
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
111 except:
111 except:
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 self.exit(1)
113 self.exit(1)
114
114
115
115
116 def start_app(self):
116 def start_app(self):
117 try:
117 try:
118 self.watcher.start()
118 self.watcher.start()
119 self.watcher.loop.start()
119 self.watcher.loop.start()
120 except KeyboardInterrupt:
120 except KeyboardInterrupt:
121 self.log.critical("Logging Interrupted, shutting down...\n")
121 self.log.critical("Logging Interrupted, shutting down...\n")
122
122
123
123
124 def launch_new_instance():
124 def launch_new_instance():
125 """Create and run the IPython LogWatcher"""
125 """Create and run the IPython LogWatcher"""
126 app = IPLoggerApp()
126 app = IPLoggerApp()
127 app.start()
127 app.start()
128
128
129
129
130 if __name__ == '__main__':
130 if __name__ == '__main__':
131 launch_new_instance()
131 launch_new_instance()
132
132
@@ -1,844 +1,847 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import logging
18 import logging
19 import os
19 import os
20 import re
20 import re
21 import sys
21 import sys
22
22
23 from signal import SIGINT, SIGTERM
23 from signal import SIGINT, SIGTERM
24 try:
24 try:
25 from signal import SIGKILL
25 from signal import SIGKILL
26 except ImportError:
26 except ImportError:
27 SIGKILL=SIGTERM
27 SIGKILL=SIGTERM
28
28
29 from subprocess import Popen, PIPE, STDOUT
29 from subprocess import Popen, PIPE, STDOUT
30 try:
30 try:
31 from subprocess import check_output
31 from subprocess import check_output
32 except ImportError:
32 except ImportError:
33 # pre-2.7:
33 # pre-2.7:
34 from StringIO import StringIO
34 from StringIO import StringIO
35
35
36 def check_output(*args, **kwargs):
36 def check_output(*args, **kwargs):
37 sio = StringIO()
37 sio = StringIO()
38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
39 p = Popen(*args, **kwargs)
39 p = Popen(*args, **kwargs)
40 out,err = p.communicate()
40 out,err = p.communicate()
41 return out
41 return out
42
42
43 from zmq.eventloop import ioloop
43 from zmq.eventloop import ioloop
44
44
45 from IPython.external import Itpl
45 from IPython.external import Itpl
46 # from IPython.config.configurable import Configurable
46 # from IPython.config.configurable import Configurable
47 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
47 from IPython.utils.traitlets import Str, Int, List, Unicode, Dict, Instance
48 from IPython.utils.path import get_ipython_module_path
48 from IPython.utils.path import get_ipython_module_path
49 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
49 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
50
50
51 from .factory import LoggingFactory
51 from .factory import LoggingFactory
52
52
53 # load winhpcjob from IPython.kernel
53 # load winhpcjob from IPython.kernel
54 try:
54 try:
55 from IPython.kernel.winhpcjob import (
55 from IPython.kernel.winhpcjob import (
56 IPControllerTask, IPEngineTask,
56 IPControllerTask, IPEngineTask,
57 IPControllerJob, IPEngineSetJob
57 IPControllerJob, IPEngineSetJob
58 )
58 )
59 except ImportError:
59 except ImportError:
60 pass
60 pass
61
61
62
62
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64 # Paths to the kernel apps
64 # Paths to the kernel apps
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66
66
67
67
68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
69 'IPython.zmq.parallel.ipclusterapp'
69 'IPython.zmq.parallel.ipclusterapp'
70 ))
70 ))
71
71
72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
73 'IPython.zmq.parallel.ipengineapp'
73 'IPython.zmq.parallel.ipengineapp'
74 ))
74 ))
75
75
76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
77 'IPython.zmq.parallel.ipcontrollerapp'
77 'IPython.zmq.parallel.ipcontrollerapp'
78 ))
78 ))
79
79
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81 # Base launchers and errors
81 # Base launchers and errors
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83
83
84
84
85 class LauncherError(Exception):
85 class LauncherError(Exception):
86 pass
86 pass
87
87
88
88
89 class ProcessStateError(LauncherError):
89 class ProcessStateError(LauncherError):
90 pass
90 pass
91
91
92
92
93 class UnknownStatus(LauncherError):
93 class UnknownStatus(LauncherError):
94 pass
94 pass
95
95
96
96
97 class BaseLauncher(LoggingFactory):
97 class BaseLauncher(LoggingFactory):
98 """An asbtraction for starting, stopping and signaling a process."""
98 """An asbtraction for starting, stopping and signaling a process."""
99
99
100 # In all of the launchers, the work_dir is where child processes will be
100 # In all of the launchers, the work_dir is where child processes will be
101 # run. This will usually be the cluster_dir, but may not be. any work_dir
101 # run. This will usually be the cluster_dir, but may not be. any work_dir
102 # passed into the __init__ method will override the config value.
102 # passed into the __init__ method will override the config value.
103 # This should not be used to set the work_dir for the actual engine
103 # This should not be used to set the work_dir for the actual engine
104 # and controller. Instead, use their own config files or the
104 # and controller. Instead, use their own config files or the
105 # controller_args, engine_args attributes of the launchers to add
105 # controller_args, engine_args attributes of the launchers to add
106 # the --work-dir option.
106 # the --work-dir option.
107 work_dir = Unicode(u'.')
107 work_dir = Unicode(u'.')
108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
109 def _loop_default(self):
109 def _loop_default(self):
110 return ioloop.IOLoop.instance()
110 return ioloop.IOLoop.instance()
111
111
112 def __init__(self, work_dir=u'.', config=None, **kwargs):
112 def __init__(self, work_dir=u'.', config=None, **kwargs):
113 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
113 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
114 self.state = 'before' # can be before, running, after
114 self.state = 'before' # can be before, running, after
115 self.stop_callbacks = []
115 self.stop_callbacks = []
116 self.start_data = None
116 self.start_data = None
117 self.stop_data = None
117 self.stop_data = None
118
118
119 @property
119 @property
120 def args(self):
120 def args(self):
121 """A list of cmd and args that will be used to start the process.
121 """A list of cmd and args that will be used to start the process.
122
122
123 This is what is passed to :func:`spawnProcess` and the first element
123 This is what is passed to :func:`spawnProcess` and the first element
124 will be the process name.
124 will be the process name.
125 """
125 """
126 return self.find_args()
126 return self.find_args()
127
127
128 def find_args(self):
128 def find_args(self):
129 """The ``.args`` property calls this to find the args list.
129 """The ``.args`` property calls this to find the args list.
130
130
131 Subcommand should implement this to construct the cmd and args.
131 Subcommand should implement this to construct the cmd and args.
132 """
132 """
133 raise NotImplementedError('find_args must be implemented in a subclass')
133 raise NotImplementedError('find_args must be implemented in a subclass')
134
134
135 @property
135 @property
136 def arg_str(self):
136 def arg_str(self):
137 """The string form of the program arguments."""
137 """The string form of the program arguments."""
138 return ' '.join(self.args)
138 return ' '.join(self.args)
139
139
140 @property
140 @property
141 def running(self):
141 def running(self):
142 """Am I running."""
142 """Am I running."""
143 if self.state == 'running':
143 if self.state == 'running':
144 return True
144 return True
145 else:
145 else:
146 return False
146 return False
147
147
148 def start(self):
148 def start(self):
149 """Start the process.
149 """Start the process.
150
150
151 This must return a deferred that fires with information about the
151 This must return a deferred that fires with information about the
152 process starting (like a pid, job id, etc.).
152 process starting (like a pid, job id, etc.).
153 """
153 """
154 raise NotImplementedError('start must be implemented in a subclass')
154 raise NotImplementedError('start must be implemented in a subclass')
155
155
156 def stop(self):
156 def stop(self):
157 """Stop the process and notify observers of stopping.
157 """Stop the process and notify observers of stopping.
158
158
159 This must return a deferred that fires with information about the
159 This must return a deferred that fires with information about the
160 processing stopping, like errors that occur while the process is
160 processing stopping, like errors that occur while the process is
161 attempting to be shut down. This deferred won't fire when the process
161 attempting to be shut down. This deferred won't fire when the process
162 actually stops. To observe the actual process stopping, see
162 actually stops. To observe the actual process stopping, see
163 :func:`observe_stop`.
163 :func:`observe_stop`.
164 """
164 """
165 raise NotImplementedError('stop must be implemented in a subclass')
165 raise NotImplementedError('stop must be implemented in a subclass')
166
166
167 def on_stop(self, f):
167 def on_stop(self, f):
168 """Get a deferred that will fire when the process stops.
168 """Get a deferred that will fire when the process stops.
169
169
170 The deferred will fire with data that contains information about
170 The deferred will fire with data that contains information about
171 the exit status of the process.
171 the exit status of the process.
172 """
172 """
173 if self.state=='after':
173 if self.state=='after':
174 return f(self.stop_data)
174 return f(self.stop_data)
175 else:
175 else:
176 self.stop_callbacks.append(f)
176 self.stop_callbacks.append(f)
177
177
178 def notify_start(self, data):
178 def notify_start(self, data):
179 """Call this to trigger startup actions.
179 """Call this to trigger startup actions.
180
180
181 This logs the process startup and sets the state to 'running'. It is
181 This logs the process startup and sets the state to 'running'. It is
182 a pass-through so it can be used as a callback.
182 a pass-through so it can be used as a callback.
183 """
183 """
184
184
185 self.log.info('Process %r started: %r' % (self.args[0], data))
185 self.log.info('Process %r started: %r' % (self.args[0], data))
186 self.start_data = data
186 self.start_data = data
187 self.state = 'running'
187 self.state = 'running'
188 return data
188 return data
189
189
190 def notify_stop(self, data):
190 def notify_stop(self, data):
191 """Call this to trigger process stop actions.
191 """Call this to trigger process stop actions.
192
192
193 This logs the process stopping and sets the state to 'after'. Call
193 This logs the process stopping and sets the state to 'after'. Call
194 this to trigger all the deferreds from :func:`observe_stop`."""
194 this to trigger all the deferreds from :func:`observe_stop`."""
195
195
196 self.log.info('Process %r stopped: %r' % (self.args[0], data))
196 self.log.info('Process %r stopped: %r' % (self.args[0], data))
197 self.stop_data = data
197 self.stop_data = data
198 self.state = 'after'
198 self.state = 'after'
199 for i in range(len(self.stop_callbacks)):
199 for i in range(len(self.stop_callbacks)):
200 d = self.stop_callbacks.pop()
200 d = self.stop_callbacks.pop()
201 d(data)
201 d(data)
202 return data
202 return data
203
203
204 def signal(self, sig):
204 def signal(self, sig):
205 """Signal the process.
205 """Signal the process.
206
206
207 Return a semi-meaningless deferred after signaling the process.
207 Return a semi-meaningless deferred after signaling the process.
208
208
209 Parameters
209 Parameters
210 ----------
210 ----------
211 sig : str or int
211 sig : str or int
212 'KILL', 'INT', etc., or any signal number
212 'KILL', 'INT', etc., or any signal number
213 """
213 """
214 raise NotImplementedError('signal must be implemented in a subclass')
214 raise NotImplementedError('signal must be implemented in a subclass')
215
215
216
216
217 #-----------------------------------------------------------------------------
217 #-----------------------------------------------------------------------------
218 # Local process launchers
218 # Local process launchers
219 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
220
220
221
221
222 class LocalProcessLauncher(BaseLauncher):
222 class LocalProcessLauncher(BaseLauncher):
223 """Start and stop an external process in an asynchronous manner.
223 """Start and stop an external process in an asynchronous manner.
224
224
225 This will launch the external process with a working directory of
225 This will launch the external process with a working directory of
226 ``self.work_dir``.
226 ``self.work_dir``.
227 """
227 """
228
228
229 # This is used to to construct self.args, which is passed to
229 # This is used to to construct self.args, which is passed to
230 # spawnProcess.
230 # spawnProcess.
231 cmd_and_args = List([])
231 cmd_and_args = List([])
232 poll_frequency = Int(100) # in ms
232 poll_frequency = Int(100) # in ms
233
233
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
235 super(LocalProcessLauncher, self).__init__(
235 super(LocalProcessLauncher, self).__init__(
236 work_dir=work_dir, config=config, **kwargs
236 work_dir=work_dir, config=config, **kwargs
237 )
237 )
238 self.process = None
238 self.process = None
239 self.start_deferred = None
239 self.start_deferred = None
240 self.poller = None
240 self.poller = None
241
241
242 def find_args(self):
242 def find_args(self):
243 return self.cmd_and_args
243 return self.cmd_and_args
244
244
245 def start(self):
245 def start(self):
246 if self.state == 'before':
246 if self.state == 'before':
247 self.process = Popen(self.args,
247 self.process = Popen(self.args,
248 stdout=PIPE,stderr=PIPE,stdin=PIPE,
248 stdout=PIPE,stderr=PIPE,stdin=PIPE,
249 env=os.environ,
249 env=os.environ,
250 cwd=self.work_dir
250 cwd=self.work_dir
251 )
251 )
252
252
253 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
253 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
254 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
254 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
255 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
255 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
256 self.poller.start()
256 self.poller.start()
257 self.notify_start(self.process.pid)
257 self.notify_start(self.process.pid)
258 else:
258 else:
259 s = 'The process was already started and has state: %r' % self.state
259 s = 'The process was already started and has state: %r' % self.state
260 raise ProcessStateError(s)
260 raise ProcessStateError(s)
261
261
262 def stop(self):
262 def stop(self):
263 return self.interrupt_then_kill()
263 return self.interrupt_then_kill()
264
264
265 def signal(self, sig):
265 def signal(self, sig):
266 if self.state == 'running':
266 if self.state == 'running':
267 self.process.send_signal(sig)
267 self.process.send_signal(sig)
268
268
269 def interrupt_then_kill(self, delay=2.0):
269 def interrupt_then_kill(self, delay=2.0):
270 """Send INT, wait a delay and then send KILL."""
270 """Send INT, wait a delay and then send KILL."""
271 self.signal(SIGINT)
271 self.signal(SIGINT)
272 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
272 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
273 self.killer.start()
273 self.killer.start()
274
274
275 # callbacks, etc:
275 # callbacks, etc:
276
276
277 def handle_stdout(self, fd, events):
277 def handle_stdout(self, fd, events):
278 line = self.process.stdout.readline()
278 line = self.process.stdout.readline()
279 # a stopped process will be readable but return empty strings
279 # a stopped process will be readable but return empty strings
280 if line:
280 if line:
281 self.log.info(line[:-1])
281 self.log.info(line[:-1])
282 else:
282 else:
283 self.poll()
283 self.poll()
284
284
285 def handle_stderr(self, fd, events):
285 def handle_stderr(self, fd, events):
286 line = self.process.stderr.readline()
286 line = self.process.stderr.readline()
287 # a stopped process will be readable but return empty strings
287 # a stopped process will be readable but return empty strings
288 if line:
288 if line:
289 self.log.error(line[:-1])
289 self.log.error(line[:-1])
290 else:
290 else:
291 self.poll()
291 self.poll()
292
292
293 def poll(self):
293 def poll(self):
294 status = self.process.poll()
294 status = self.process.poll()
295 if status is not None:
295 if status is not None:
296 self.poller.stop()
296 self.poller.stop()
297 self.loop.remove_handler(self.process.stdout.fileno())
297 self.loop.remove_handler(self.process.stdout.fileno())
298 self.loop.remove_handler(self.process.stderr.fileno())
298 self.loop.remove_handler(self.process.stderr.fileno())
299 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
299 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
300 return status
300 return status
301
301
302 class LocalControllerLauncher(LocalProcessLauncher):
302 class LocalControllerLauncher(LocalProcessLauncher):
303 """Launch a controller as a regular external process."""
303 """Launch a controller as a regular external process."""
304
304
305 controller_cmd = List(ipcontroller_cmd_argv, config=True)
305 controller_cmd = List(ipcontroller_cmd_argv, config=True)
306 # Command line arguments to ipcontroller.
306 # Command line arguments to ipcontroller.
307 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
307 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
308
308
309 def find_args(self):
309 def find_args(self):
310 return self.controller_cmd + self.controller_args
310 return self.controller_cmd + self.controller_args
311
311
312 def start(self, cluster_dir):
312 def start(self, cluster_dir):
313 """Start the controller by cluster_dir."""
313 """Start the controller by cluster_dir."""
314 self.controller_args.extend(['--cluster-dir', cluster_dir])
314 self.controller_args.extend(['--cluster-dir', cluster_dir])
315 self.cluster_dir = unicode(cluster_dir)
315 self.cluster_dir = unicode(cluster_dir)
316 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
316 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
317 return super(LocalControllerLauncher, self).start()
317 return super(LocalControllerLauncher, self).start()
318
318
319
319
320 class LocalEngineLauncher(LocalProcessLauncher):
320 class LocalEngineLauncher(LocalProcessLauncher):
321 """Launch a single engine as a regular externall process."""
321 """Launch a single engine as a regular externall process."""
322
322
323 engine_cmd = List(ipengine_cmd_argv, config=True)
323 engine_cmd = List(ipengine_cmd_argv, config=True)
324 # Command line arguments for ipengine.
324 # Command line arguments for ipengine.
325 engine_args = List(
325 engine_args = List(
326 ['--log-to-file','--log-level', str(logging.INFO)], config=True
326 ['--log-to-file','--log-level', str(logging.INFO)], config=True
327 )
327 )
328
328
329 def find_args(self):
329 def find_args(self):
330 return self.engine_cmd + self.engine_args
330 return self.engine_cmd + self.engine_args
331
331
332 def start(self, cluster_dir):
332 def start(self, cluster_dir):
333 """Start the engine by cluster_dir."""
333 """Start the engine by cluster_dir."""
334 self.engine_args.extend(['--cluster-dir', cluster_dir])
334 self.engine_args.extend(['--cluster-dir', cluster_dir])
335 self.cluster_dir = unicode(cluster_dir)
335 self.cluster_dir = unicode(cluster_dir)
336 return super(LocalEngineLauncher, self).start()
336 return super(LocalEngineLauncher, self).start()
337
337
338
338
339 class LocalEngineSetLauncher(BaseLauncher):
339 class LocalEngineSetLauncher(BaseLauncher):
340 """Launch a set of engines as regular external processes."""
340 """Launch a set of engines as regular external processes."""
341
341
342 # Command line arguments for ipengine.
342 # Command line arguments for ipengine.
343 engine_args = List(
343 engine_args = List(
344 ['--log-to-file','--log-level', str(logging.INFO)], config=True
344 ['--log-to-file','--log-level', str(logging.INFO)], config=True
345 )
345 )
346 # launcher class
346 # launcher class
347 launcher_class = LocalEngineLauncher
347 launcher_class = LocalEngineLauncher
348
348
349 def __init__(self, work_dir=u'.', config=None, **kwargs):
349 def __init__(self, work_dir=u'.', config=None, **kwargs):
350 super(LocalEngineSetLauncher, self).__init__(
350 super(LocalEngineSetLauncher, self).__init__(
351 work_dir=work_dir, config=config, **kwargs
351 work_dir=work_dir, config=config, **kwargs
352 )
352 )
353 self.launchers = {}
353 self.launchers = {}
354 self.stop_data = {}
354 self.stop_data = {}
355
355
356 def start(self, n, cluster_dir):
356 def start(self, n, cluster_dir):
357 """Start n engines by profile or cluster_dir."""
357 """Start n engines by profile or cluster_dir."""
358 self.cluster_dir = unicode(cluster_dir)
358 self.cluster_dir = unicode(cluster_dir)
359 dlist = []
359 dlist = []
360 for i in range(n):
360 for i in range(n):
361 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
361 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
362 # Copy the engine args over to each engine launcher.
362 # Copy the engine args over to each engine launcher.
363 import copy
363 import copy
364 el.engine_args = copy.deepcopy(self.engine_args)
364 el.engine_args = copy.deepcopy(self.engine_args)
365 el.on_stop(self._notice_engine_stopped)
365 el.on_stop(self._notice_engine_stopped)
366 d = el.start(cluster_dir)
366 d = el.start(cluster_dir)
367 if i==0:
367 if i==0:
368 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
368 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
369 self.launchers[i] = el
369 self.launchers[i] = el
370 dlist.append(d)
370 dlist.append(d)
371 self.notify_start(dlist)
371 self.notify_start(dlist)
372 # The consumeErrors here could be dangerous
372 # The consumeErrors here could be dangerous
373 # dfinal = gatherBoth(dlist, consumeErrors=True)
373 # dfinal = gatherBoth(dlist, consumeErrors=True)
374 # dfinal.addCallback(self.notify_start)
374 # dfinal.addCallback(self.notify_start)
375 return dlist
375 return dlist
376
376
377 def find_args(self):
377 def find_args(self):
378 return ['engine set']
378 return ['engine set']
379
379
380 def signal(self, sig):
380 def signal(self, sig):
381 dlist = []
381 dlist = []
382 for el in self.launchers.itervalues():
382 for el in self.launchers.itervalues():
383 d = el.signal(sig)
383 d = el.signal(sig)
384 dlist.append(d)
384 dlist.append(d)
385 # dfinal = gatherBoth(dlist, consumeErrors=True)
385 # dfinal = gatherBoth(dlist, consumeErrors=True)
386 return dlist
386 return dlist
387
387
388 def interrupt_then_kill(self, delay=1.0):
388 def interrupt_then_kill(self, delay=1.0):
389 dlist = []
389 dlist = []
390 for el in self.launchers.itervalues():
390 for el in self.launchers.itervalues():
391 d = el.interrupt_then_kill(delay)
391 d = el.interrupt_then_kill(delay)
392 dlist.append(d)
392 dlist.append(d)
393 # dfinal = gatherBoth(dlist, consumeErrors=True)
393 # dfinal = gatherBoth(dlist, consumeErrors=True)
394 return dlist
394 return dlist
395
395
396 def stop(self):
396 def stop(self):
397 return self.interrupt_then_kill()
397 return self.interrupt_then_kill()
398
398
399 def _notice_engine_stopped(self, data):
399 def _notice_engine_stopped(self, data):
400 print "notice", data
400 print "notice", data
401 pid = data['pid']
401 pid = data['pid']
402 for idx,el in self.launchers.iteritems():
402 for idx,el in self.launchers.iteritems():
403 if el.process.pid == pid:
403 if el.process.pid == pid:
404 break
404 break
405 self.launchers.pop(idx)
405 self.launchers.pop(idx)
406 self.stop_data[idx] = data
406 self.stop_data[idx] = data
407 if not self.launchers:
407 if not self.launchers:
408 self.notify_stop(self.stop_data)
408 self.notify_stop(self.stop_data)
409
409
410
410
411 #-----------------------------------------------------------------------------
411 #-----------------------------------------------------------------------------
412 # MPIExec launchers
412 # MPIExec launchers
413 #-----------------------------------------------------------------------------
413 #-----------------------------------------------------------------------------
414
414
415
415
416 class MPIExecLauncher(LocalProcessLauncher):
416 class MPIExecLauncher(LocalProcessLauncher):
417 """Launch an external process using mpiexec."""
417 """Launch an external process using mpiexec."""
418
418
419 # The mpiexec command to use in starting the process.
419 # The mpiexec command to use in starting the process.
420 mpi_cmd = List(['mpiexec'], config=True)
420 mpi_cmd = List(['mpiexec'], config=True)
421 # The command line arguments to pass to mpiexec.
421 # The command line arguments to pass to mpiexec.
422 mpi_args = List([], config=True)
422 mpi_args = List([], config=True)
423 # The program to start using mpiexec.
423 # The program to start using mpiexec.
424 program = List(['date'], config=True)
424 program = List(['date'], config=True)
425 # The command line argument to the program.
425 # The command line argument to the program.
426 program_args = List([], config=True)
426 program_args = List([], config=True)
427 # The number of instances of the program to start.
427 # The number of instances of the program to start.
428 n = Int(1, config=True)
428 n = Int(1, config=True)
429
429
430 def find_args(self):
430 def find_args(self):
431 """Build self.args using all the fields."""
431 """Build self.args using all the fields."""
432 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
432 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
433 self.program + self.program_args
433 self.program + self.program_args
434
434
435 def start(self, n):
435 def start(self, n):
436 """Start n instances of the program using mpiexec."""
436 """Start n instances of the program using mpiexec."""
437 self.n = n
437 self.n = n
438 return super(MPIExecLauncher, self).start()
438 return super(MPIExecLauncher, self).start()
439
439
440
440
441 class MPIExecControllerLauncher(MPIExecLauncher):
441 class MPIExecControllerLauncher(MPIExecLauncher):
442 """Launch a controller using mpiexec."""
442 """Launch a controller using mpiexec."""
443
443
444 controller_cmd = List(ipcontroller_cmd_argv, config=True)
444 controller_cmd = List(ipcontroller_cmd_argv, config=True)
445 # Command line arguments to ipcontroller.
445 # Command line arguments to ipcontroller.
446 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
446 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
447 n = Int(1, config=False)
447 n = Int(1, config=False)
448
448
449 def start(self, cluster_dir):
449 def start(self, cluster_dir):
450 """Start the controller by cluster_dir."""
450 """Start the controller by cluster_dir."""
451 self.controller_args.extend(['--cluster-dir', cluster_dir])
451 self.controller_args.extend(['--cluster-dir', cluster_dir])
452 self.cluster_dir = unicode(cluster_dir)
452 self.cluster_dir = unicode(cluster_dir)
453 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
453 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
454 return super(MPIExecControllerLauncher, self).start(1)
454 return super(MPIExecControllerLauncher, self).start(1)
455
455
456 def find_args(self):
456 def find_args(self):
457 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
457 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
458 self.controller_cmd + self.controller_args
458 self.controller_cmd + self.controller_args
459
459
460
460
461 class MPIExecEngineSetLauncher(MPIExecLauncher):
461 class MPIExecEngineSetLauncher(MPIExecLauncher):
462
462
463 engine_cmd = List(ipengine_cmd_argv, config=True)
463 engine_cmd = List(ipengine_cmd_argv, config=True)
464 # Command line arguments for ipengine.
464 # Command line arguments for ipengine.
465 engine_args = List(
465 engine_args = List(
466 ['--log-to-file','--log-level', str(logging.INFO)], config=True
466 ['--log-to-file','--log-level', str(logging.INFO)], config=True
467 )
467 )
468 n = Int(1, config=True)
468 n = Int(1, config=True)
469
469
470 def start(self, n, cluster_dir):
470 def start(self, n, cluster_dir):
471 """Start n engines by profile or cluster_dir."""
471 """Start n engines by profile or cluster_dir."""
472 self.engine_args.extend(['--cluster-dir', cluster_dir])
472 self.engine_args.extend(['--cluster-dir', cluster_dir])
473 self.cluster_dir = unicode(cluster_dir)
473 self.cluster_dir = unicode(cluster_dir)
474 self.n = n
474 self.n = n
475 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
475 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
476 return super(MPIExecEngineSetLauncher, self).start(n)
476 return super(MPIExecEngineSetLauncher, self).start(n)
477
477
478 def find_args(self):
478 def find_args(self):
479 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
479 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
480 self.engine_cmd + self.engine_args
480 self.engine_cmd + self.engine_args
481
481
482
482
483 #-----------------------------------------------------------------------------
483 #-----------------------------------------------------------------------------
484 # SSH launchers
484 # SSH launchers
485 #-----------------------------------------------------------------------------
485 #-----------------------------------------------------------------------------
486
486
487 # TODO: Get SSH Launcher working again.
487 # TODO: Get SSH Launcher working again.
488
488
489 class SSHLauncher(LocalProcessLauncher):
489 class SSHLauncher(LocalProcessLauncher):
490 """A minimal launcher for ssh.
490 """A minimal launcher for ssh.
491
491
492 To be useful this will probably have to be extended to use the ``sshx``
492 To be useful this will probably have to be extended to use the ``sshx``
493 idea for environment variables. There could be other things this needs
493 idea for environment variables. There could be other things this needs
494 as well.
494 as well.
495 """
495 """
496
496
497 ssh_cmd = List(['ssh'], config=True)
497 ssh_cmd = List(['ssh'], config=True)
498 ssh_args = List(['-tt'], config=True)
498 ssh_args = List(['-tt'], config=True)
499 program = List(['date'], config=True)
499 program = List(['date'], config=True)
500 program_args = List([], config=True)
500 program_args = List([], config=True)
501 hostname = Str('', config=True)
501 hostname = Str('', config=True)
502 user = Str(os.environ.get('USER','username'), config=True)
502 user = Str(os.environ.get('USER','username'), config=True)
503 location = Str('')
503 location = Str('')
504
504
505 def _hostname_changed(self, name, old, new):
505 def _hostname_changed(self, name, old, new):
506 self.location = '%s@%s' % (self.user, new)
506 self.location = '%s@%s' % (self.user, new)
507
507
508 def _user_changed(self, name, old, new):
508 def _user_changed(self, name, old, new):
509 self.location = '%s@%s' % (new, self.hostname)
509 self.location = '%s@%s' % (new, self.hostname)
510
510
511 def find_args(self):
511 def find_args(self):
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
513 self.program + self.program_args
513 self.program + self.program_args
514
514
515 def start(self, cluster_dir, hostname=None, user=None):
515 def start(self, cluster_dir, hostname=None, user=None):
516 print self.config
516 print self.config
517 if hostname is not None:
517 if hostname is not None:
518 self.hostname = hostname
518 self.hostname = hostname
519 if user is not None:
519 if user is not None:
520 self.user = user
520 self.user = user
521 print (self.location, hostname, user)
521 print (self.location, hostname, user)
522 return super(SSHLauncher, self).start()
522 return super(SSHLauncher, self).start()
523
523
524 def signal(self, sig):
524 def signal(self, sig):
525 if self.state == 'running':
525 if self.state == 'running':
526 # send escaped ssh connection-closer
526 # send escaped ssh connection-closer
527 self.process.stdin.write('~.')
527 self.process.stdin.write('~.')
528 self.process.stdin.flush()
528 self.process.stdin.flush()
529
529
530
530
531
531
532 class SSHControllerLauncher(SSHLauncher):
532 class SSHControllerLauncher(SSHLauncher):
533
533
534 program = List(ipcontroller_cmd_argv, config=True)
534 program = List(ipcontroller_cmd_argv, config=True)
535 # Command line arguments to ipcontroller.
535 # Command line arguments to ipcontroller.
536 program_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
536 program_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
537
537
538
538
539 class SSHEngineLauncher(SSHLauncher):
539 class SSHEngineLauncher(SSHLauncher):
540 program = List(ipengine_cmd_argv, config=True)
540 program = List(ipengine_cmd_argv, config=True)
541 # Command line arguments for ipengine.
541 # Command line arguments for ipengine.
542 program_args = List(
542 program_args = List(
543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
544 )
544 )
545
545
546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
547 launcher_class = SSHEngineLauncher
547 launcher_class = SSHEngineLauncher
548
548
549
549
550 #-----------------------------------------------------------------------------
550 #-----------------------------------------------------------------------------
551 # Windows HPC Server 2008 scheduler launchers
551 # Windows HPC Server 2008 scheduler launchers
552 #-----------------------------------------------------------------------------
552 #-----------------------------------------------------------------------------
553
553
554
554
555 # This is only used on Windows.
555 # This is only used on Windows.
556 def find_job_cmd():
556 def find_job_cmd():
557 if os.name=='nt':
557 if os.name=='nt':
558 try:
558 try:
559 return find_cmd('job')
559 return find_cmd('job')
560 except FindCmdError:
560 except FindCmdError:
561 return 'job'
561 return 'job'
562 else:
562 else:
563 return 'job'
563 return 'job'
564
564
565
565
566 class WindowsHPCLauncher(BaseLauncher):
566 class WindowsHPCLauncher(BaseLauncher):
567
567
568 # A regular expression used to get the job id from the output of the
568 # A regular expression used to get the job id from the output of the
569 # submit_command.
569 # submit_command.
570 job_id_regexp = Str(r'\d+', config=True)
570 job_id_regexp = Str(r'\d+', config=True)
571 # The filename of the instantiated job script.
571 # The filename of the instantiated job script.
572 job_file_name = Unicode(u'ipython_job.xml', config=True)
572 job_file_name = Unicode(u'ipython_job.xml', config=True)
573 # The full path to the instantiated job script. This gets made dynamically
573 # The full path to the instantiated job script. This gets made dynamically
574 # by combining the work_dir with the job_file_name.
574 # by combining the work_dir with the job_file_name.
575 job_file = Unicode(u'')
575 job_file = Unicode(u'')
576 # The hostname of the scheduler to submit the job to
576 # The hostname of the scheduler to submit the job to
577 scheduler = Str('', config=True)
577 scheduler = Str('', config=True)
578 job_cmd = Str(find_job_cmd(), config=True)
578 job_cmd = Str(find_job_cmd(), config=True)
579
579
580 def __init__(self, work_dir=u'.', config=None, **kwargs):
580 def __init__(self, work_dir=u'.', config=None, **kwargs):
581 super(WindowsHPCLauncher, self).__init__(
581 super(WindowsHPCLauncher, self).__init__(
582 work_dir=work_dir, config=config, **kwargs
582 work_dir=work_dir, config=config, **kwargs
583 )
583 )
584
584
585 @property
585 @property
586 def job_file(self):
586 def job_file(self):
587 return os.path.join(self.work_dir, self.job_file_name)
587 return os.path.join(self.work_dir, self.job_file_name)
588
588
589 def write_job_file(self, n):
589 def write_job_file(self, n):
590 raise NotImplementedError("Implement write_job_file in a subclass.")
590 raise NotImplementedError("Implement write_job_file in a subclass.")
591
591
592 def find_args(self):
592 def find_args(self):
593 return ['job.exe']
593 return ['job.exe']
594
594
595 def parse_job_id(self, output):
595 def parse_job_id(self, output):
596 """Take the output of the submit command and return the job id."""
596 """Take the output of the submit command and return the job id."""
597 m = re.search(self.job_id_regexp, output)
597 m = re.search(self.job_id_regexp, output)
598 if m is not None:
598 if m is not None:
599 job_id = m.group()
599 job_id = m.group()
600 else:
600 else:
601 raise LauncherError("Job id couldn't be determined: %s" % output)
601 raise LauncherError("Job id couldn't be determined: %s" % output)
602 self.job_id = job_id
602 self.job_id = job_id
603 self.log.info('Job started with job id: %r' % job_id)
603 self.log.info('Job started with job id: %r' % job_id)
604 return job_id
604 return job_id
605
605
606 def start(self, n):
606 def start(self, n):
607 """Start n copies of the process using the Win HPC job scheduler."""
607 """Start n copies of the process using the Win HPC job scheduler."""
608 self.write_job_file(n)
608 self.write_job_file(n)
609 args = [
609 args = [
610 'submit',
610 'submit',
611 '/jobfile:%s' % self.job_file,
611 '/jobfile:%s' % self.job_file,
612 '/scheduler:%s' % self.scheduler
612 '/scheduler:%s' % self.scheduler
613 ]
613 ]
614 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
614 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
615 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
615 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
616 output = check_output([self.job_cmd]+args,
616 output = check_output([self.job_cmd]+args,
617 env=os.environ,
617 env=os.environ,
618 cwd=self.work_dir,
618 cwd=self.work_dir,
619 stderr=STDOUT
619 stderr=STDOUT
620 )
620 )
621 job_id = self.parse_job_id(output)
621 job_id = self.parse_job_id(output)
622 # self.notify_start(job_id)
622 # self.notify_start(job_id)
623 return job_id
623 return job_id
624
624
625 def stop(self):
625 def stop(self):
626 args = [
626 args = [
627 'cancel',
627 'cancel',
628 self.job_id,
628 self.job_id,
629 '/scheduler:%s' % self.scheduler
629 '/scheduler:%s' % self.scheduler
630 ]
630 ]
631 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
631 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
632 try:
632 try:
633 output = check_output([self.job_cmd]+args,
633 output = check_output([self.job_cmd]+args,
634 env=os.environ,
634 env=os.environ,
635 cwd=self.work_dir,
635 cwd=self.work_dir,
636 stderr=STDOUT
636 stderr=STDOUT
637 )
637 )
638 except:
638 except:
639 output = 'The job already appears to be stoppped: %r' % self.job_id
639 output = 'The job already appears to be stoppped: %r' % self.job_id
640 self.notify_stop(output) # Pass the output of the kill cmd
640 self.notify_stop(output) # Pass the output of the kill cmd
641 return output
641 return output
642
642
643
643
644 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
644 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
645
645
646 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
646 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
647 extra_args = List([], config=False)
647 extra_args = List([], config=False)
648
648
649 def write_job_file(self, n):
649 def write_job_file(self, n):
650 job = IPControllerJob(config=self.config)
650 job = IPControllerJob(config=self.config)
651
651
652 t = IPControllerTask(config=self.config)
652 t = IPControllerTask(config=self.config)
653 # The tasks work directory is *not* the actual work directory of
653 # The tasks work directory is *not* the actual work directory of
654 # the controller. It is used as the base path for the stdout/stderr
654 # the controller. It is used as the base path for the stdout/stderr
655 # files that the scheduler redirects to.
655 # files that the scheduler redirects to.
656 t.work_directory = self.cluster_dir
656 t.work_directory = self.cluster_dir
657 # Add the --cluster-dir and from self.start().
657 # Add the --cluster-dir and from self.start().
658 t.controller_args.extend(self.extra_args)
658 t.controller_args.extend(self.extra_args)
659 job.add_task(t)
659 job.add_task(t)
660
660
661 self.log.info("Writing job description file: %s" % self.job_file)
661 self.log.info("Writing job description file: %s" % self.job_file)
662 job.write(self.job_file)
662 job.write(self.job_file)
663
663
664 @property
664 @property
665 def job_file(self):
665 def job_file(self):
666 return os.path.join(self.cluster_dir, self.job_file_name)
666 return os.path.join(self.cluster_dir, self.job_file_name)
667
667
668 def start(self, cluster_dir):
668 def start(self, cluster_dir):
669 """Start the controller by cluster_dir."""
669 """Start the controller by cluster_dir."""
670 self.extra_args = ['--cluster-dir', cluster_dir]
670 self.extra_args = ['--cluster-dir', cluster_dir]
671 self.cluster_dir = unicode(cluster_dir)
671 self.cluster_dir = unicode(cluster_dir)
672 return super(WindowsHPCControllerLauncher, self).start(1)
672 return super(WindowsHPCControllerLauncher, self).start(1)
673
673
674
674
675 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
675 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
676
676
677 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
677 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
678 extra_args = List([], config=False)
678 extra_args = List([], config=False)
679
679
680 def write_job_file(self, n):
680 def write_job_file(self, n):
681 job = IPEngineSetJob(config=self.config)
681 job = IPEngineSetJob(config=self.config)
682
682
683 for i in range(n):
683 for i in range(n):
684 t = IPEngineTask(config=self.config)
684 t = IPEngineTask(config=self.config)
685 # The tasks work directory is *not* the actual work directory of
685 # The tasks work directory is *not* the actual work directory of
686 # the engine. It is used as the base path for the stdout/stderr
686 # the engine. It is used as the base path for the stdout/stderr
687 # files that the scheduler redirects to.
687 # files that the scheduler redirects to.
688 t.work_directory = self.cluster_dir
688 t.work_directory = self.cluster_dir
689 # Add the --cluster-dir and from self.start().
689 # Add the --cluster-dir and from self.start().
690 t.engine_args.extend(self.extra_args)
690 t.engine_args.extend(self.extra_args)
691 job.add_task(t)
691 job.add_task(t)
692
692
693 self.log.info("Writing job description file: %s" % self.job_file)
693 self.log.info("Writing job description file: %s" % self.job_file)
694 job.write(self.job_file)
694 job.write(self.job_file)
695
695
696 @property
696 @property
697 def job_file(self):
697 def job_file(self):
698 return os.path.join(self.cluster_dir, self.job_file_name)
698 return os.path.join(self.cluster_dir, self.job_file_name)
699
699
700 def start(self, n, cluster_dir):
700 def start(self, n, cluster_dir):
701 """Start the controller by cluster_dir."""
701 """Start the controller by cluster_dir."""
702 self.extra_args = ['--cluster-dir', cluster_dir]
702 self.extra_args = ['--cluster-dir', cluster_dir]
703 self.cluster_dir = unicode(cluster_dir)
703 self.cluster_dir = unicode(cluster_dir)
704 return super(WindowsHPCEngineSetLauncher, self).start(n)
704 return super(WindowsHPCEngineSetLauncher, self).start(n)
705
705
706
706
707 #-----------------------------------------------------------------------------
707 #-----------------------------------------------------------------------------
708 # Batch (PBS) system launchers
708 # Batch (PBS) system launchers
709 #-----------------------------------------------------------------------------
709 #-----------------------------------------------------------------------------
710
710
711 # TODO: Get PBS launcher working again.
711 # TODO: Get PBS launcher working again.
712
712
713 class BatchSystemLauncher(BaseLauncher):
713 class BatchSystemLauncher(BaseLauncher):
714 """Launch an external process using a batch system.
714 """Launch an external process using a batch system.
715
715
716 This class is designed to work with UNIX batch systems like PBS, LSF,
716 This class is designed to work with UNIX batch systems like PBS, LSF,
717 GridEngine, etc. The overall model is that there are different commands
717 GridEngine, etc. The overall model is that there are different commands
718 like qsub, qdel, etc. that handle the starting and stopping of the process.
718 like qsub, qdel, etc. that handle the starting and stopping of the process.
719
719
720 This class also has the notion of a batch script. The ``batch_template``
720 This class also has the notion of a batch script. The ``batch_template``
721 attribute can be set to a string that is a template for the batch script.
721 attribute can be set to a string that is a template for the batch script.
722 This template is instantiated using Itpl. Thus the template can use
722 This template is instantiated using Itpl. Thus the template can use
723 ${n} fot the number of instances. Subclasses can add additional variables
723 ${n} fot the number of instances. Subclasses can add additional variables
724 to the template dict.
724 to the template dict.
725 """
725 """
726
726
727 # Subclasses must fill these in. See PBSEngineSet
727 # Subclasses must fill these in. See PBSEngineSet
728 # The name of the command line program used to submit jobs.
728 # The name of the command line program used to submit jobs.
729 submit_command = Str('', config=True)
729 submit_command = Str('', config=True)
730 # The name of the command line program used to delete jobs.
730 # The name of the command line program used to delete jobs.
731 delete_command = Str('', config=True)
731 delete_command = Str('', config=True)
732 # A regular expression used to get the job id from the output of the
732 # A regular expression used to get the job id from the output of the
733 # submit_command.
733 # submit_command.
734 job_id_regexp = Str('', config=True)
734 job_id_regexp = Str('', config=True)
735 # The string that is the batch script template itself.
735 # The string that is the batch script template itself.
736 batch_template = Str('', config=True)
736 batch_template = Str('', config=True)
737 # The filename of the instantiated batch script.
737 # The filename of the instantiated batch script.
738 batch_file_name = Unicode(u'batch_script', config=True)
738 batch_file_name = Unicode(u'batch_script', config=True)
739 # The full path to the instantiated batch script.
739 # The full path to the instantiated batch script.
740 batch_file = Unicode(u'')
740 batch_file = Unicode(u'')
741 # the format dict used with batch_template:
742 context = Dict()
741
743
744
745 def find_args(self):
746 return [self.submit_command]
747
742 def __init__(self, work_dir=u'.', config=None, **kwargs):
748 def __init__(self, work_dir=u'.', config=None, **kwargs):
743 super(BatchSystemLauncher, self).__init__(
749 super(BatchSystemLauncher, self).__init__(
744 work_dir=work_dir, config=config, **kwargs
750 work_dir=work_dir, config=config, **kwargs
745 )
751 )
746 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
752 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
747 self.context = {}
748
753
749 def parse_job_id(self, output):
754 def parse_job_id(self, output):
750 """Take the output of the submit command and return the job id."""
755 """Take the output of the submit command and return the job id."""
751 m = re.match(self.job_id_regexp, output)
756 m = re.match(self.job_id_regexp, output)
752 if m is not None:
757 if m is not None:
753 job_id = m.group()
758 job_id = m.group()
754 else:
759 else:
755 raise LauncherError("Job id couldn't be determined: %s" % output)
760 raise LauncherError("Job id couldn't be determined: %s" % output)
756 self.job_id = job_id
761 self.job_id = job_id
757 self.log.info('Job started with job id: %r' % job_id)
762 self.log.info('Job started with job id: %r' % job_id)
758 return job_id
763 return job_id
759
764
760 def write_batch_script(self, n):
765 def write_batch_script(self, n):
761 """Instantiate and write the batch script to the work_dir."""
766 """Instantiate and write the batch script to the work_dir."""
762 self.context['n'] = n
767 self.context['n'] = n
763 script_as_string = Itpl.itplns(self.batch_template, self.context)
768 script_as_string = Itpl.itplns(self.batch_template, self.context)
764 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
769 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
765 f = open(self.batch_file, 'w')
770 f = open(self.batch_file, 'w')
766 f.write(script_as_string)
771 f.write(script_as_string)
767 f.close()
772 f.close()
768
773
769 def start(self, n):
774 def start(self, n, cluster_dir):
770 """Start n copies of the process using a batch system."""
775 """Start n copies of the process using a batch system."""
776 # Here we save profile and cluster_dir in the context so they
777 # can be used in the batch script template as ${profile} and
778 # ${cluster_dir}
779 self.context['cluster_dir'] = cluster_dir
780 self.cluster_dir = unicode(cluster_dir)
771 self.write_batch_script(n)
781 self.write_batch_script(n)
772 output = check_output([self.submit_command, self.batch_file], env=os.environ, stdout=STDOUT)
782 output = check_output([self.submit_command, self.batch_file], env=os.environ, stdout=STDOUT)
773 job_id = self.parse_job_id(output)
783 job_id = self.parse_job_id(output)
774 # self.notify_start(job_id)
784 # self.notify_start(job_id)
775 return job_id
785 return job_id
776
786
777 def stop(self):
787 def stop(self):
778 output = check_output([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
788 output = check_output([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
779 self.notify_stop(output) # Pass the output of the kill cmd
789 self.notify_stop(output) # Pass the output of the kill cmd
780 return output
790 return output
781
791
782
792
783 class PBSLauncher(BatchSystemLauncher):
793 class PBSLauncher(BatchSystemLauncher):
784 """A BatchSystemLauncher subclass for PBS."""
794 """A BatchSystemLauncher subclass for PBS."""
785
795
786 submit_command = Str('qsub', config=True)
796 submit_command = Str('qsub', config=True)
787 delete_command = Str('qdel', config=True)
797 delete_command = Str('qdel', config=True)
788 job_id_regexp = Str(r'\d+', config=True)
798 job_id_regexp = Str(r'\d+', config=True)
789 batch_template = Str('', config=True)
799 batch_template = Str('', config=True)
790 batch_file_name = Unicode(u'pbs_batch_script', config=True)
800 batch_file_name = Unicode(u'pbs_batch_script', config=True)
791 batch_file = Unicode(u'')
801 batch_file = Unicode(u'')
792
802
793
803
794 class PBSControllerLauncher(PBSLauncher):
804 class PBSControllerLauncher(PBSLauncher):
795 """Launch a controller using PBS."""
805 """Launch a controller using PBS."""
796
806
797 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
807 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
798
808
799 def start(self, cluster_dir):
809 def start(self, cluster_dir):
800 """Start the controller by profile or cluster_dir."""
810 """Start the controller by profile or cluster_dir."""
801 # Here we save profile and cluster_dir in the context so they
802 # can be used in the batch script template as ${profile} and
803 # ${cluster_dir}
804 self.context['cluster_dir'] = cluster_dir
805 self.cluster_dir = unicode(cluster_dir)
806 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
811 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
807 return super(PBSControllerLauncher, self).start(1)
812 return super(PBSControllerLauncher, self).start(1, cluster_dir)
808
813
809
814
810 class PBSEngineSetLauncher(PBSLauncher):
815 class PBSEngineSetLauncher(PBSLauncher):
811
816
812 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
817 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
813
818
814 def start(self, n, cluster_dir):
819 def start(self, n, cluster_dir):
815 """Start n engines by profile or cluster_dir."""
820 """Start n engines by profile or cluster_dir."""
816 self.program_args.extend(['--cluster-dir', cluster_dir])
817 self.cluster_dir = unicode(cluster_dir)
818 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
821 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
819 return super(PBSEngineSetLauncher, self).start(n)
822 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
820
823
821
824
822 #-----------------------------------------------------------------------------
825 #-----------------------------------------------------------------------------
823 # A launcher for ipcluster itself!
826 # A launcher for ipcluster itself!
824 #-----------------------------------------------------------------------------
827 #-----------------------------------------------------------------------------
825
828
826
829
827 class IPClusterLauncher(LocalProcessLauncher):
830 class IPClusterLauncher(LocalProcessLauncher):
828 """Launch the ipcluster program in an external process."""
831 """Launch the ipcluster program in an external process."""
829
832
830 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
833 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
831 # Command line arguments to pass to ipcluster.
834 # Command line arguments to pass to ipcluster.
832 ipcluster_args = List(
835 ipcluster_args = List(
833 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
836 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
834 ipcluster_subcommand = Str('start')
837 ipcluster_subcommand = Str('start')
835 ipcluster_n = Int(2)
838 ipcluster_n = Int(2)
836
839
837 def find_args(self):
840 def find_args(self):
838 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
841 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
839 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
842 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
840
843
841 def start(self):
844 def start(self):
842 self.log.info("Starting ipcluster: %r" % self.args)
845 self.log.info("Starting ipcluster: %r" % self.args)
843 return super(IPClusterLauncher, self).start()
846 return super(IPClusterLauncher, self).start()
844
847
General Comments 0
You need to be logged in to leave comments. Login now