##// END OF EJS Templates
cleanup aliases in parallel apps...
MinRK -
Show More
@@ -1,267 +1,266 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The Base Application class for IPython.parallel apps
4 The Base Application class for IPython.parallel apps
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * Min RK
9 * Min RK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import os
26 import os
27 import logging
27 import logging
28 import re
28 import re
29 import sys
29 import sys
30
30
31 from subprocess import Popen, PIPE
31 from subprocess import Popen, PIPE
32
32
33 from IPython.core import release
33 from IPython.core import release
34 from IPython.core.crashhandler import CrashHandler
34 from IPython.core.crashhandler import CrashHandler
35 from IPython.core.application import (
35 from IPython.core.application import (
36 BaseIPythonApplication,
36 BaseIPythonApplication,
37 base_aliases as base_ip_aliases,
37 base_aliases as base_ip_aliases,
38 base_flags as base_ip_flags
38 base_flags as base_ip_flags
39 )
39 )
40 from IPython.utils.path import expand_path
40 from IPython.utils.path import expand_path
41
41
42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
43
43
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45 # Module errors
45 # Module errors
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47
47
48 class PIDFileError(Exception):
48 class PIDFileError(Exception):
49 pass
49 pass
50
50
51
51
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 # Crash handler for this application
53 # Crash handler for this application
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55
55
56
56
57 _message_template = """\
57 _message_template = """\
58 Oops, $self.app_name crashed. We do our best to make it stable, but...
58 Oops, $self.app_name crashed. We do our best to make it stable, but...
59
59
60 A crash report was automatically generated with the following information:
60 A crash report was automatically generated with the following information:
61 - A verbatim copy of the crash traceback.
61 - A verbatim copy of the crash traceback.
62 - Data on your current $self.app_name configuration.
62 - Data on your current $self.app_name configuration.
63
63
64 It was left in the file named:
64 It was left in the file named:
65 \t'$self.crash_report_fname'
65 \t'$self.crash_report_fname'
66 If you can email this file to the developers, the information in it will help
66 If you can email this file to the developers, the information in it will help
67 them in understanding and correcting the problem.
67 them in understanding and correcting the problem.
68
68
69 You can mail it to: $self.contact_name at $self.contact_email
69 You can mail it to: $self.contact_name at $self.contact_email
70 with the subject '$self.app_name Crash Report'.
70 with the subject '$self.app_name Crash Report'.
71
71
72 If you want to do it now, the following command will work (under Unix):
72 If you want to do it now, the following command will work (under Unix):
73 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
73 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
74
74
75 To ensure accurate tracking of this issue, please file a report about it at:
75 To ensure accurate tracking of this issue, please file a report about it at:
76 $self.bug_tracker
76 $self.bug_tracker
77 """
77 """
78
78
79 class ParallelCrashHandler(CrashHandler):
79 class ParallelCrashHandler(CrashHandler):
80 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
80 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
81
81
82 message_template = _message_template
82 message_template = _message_template
83
83
84 def __init__(self, app):
84 def __init__(self, app):
85 contact_name = release.authors['Min'][0]
85 contact_name = release.authors['Min'][0]
86 contact_email = release.authors['Min'][1]
86 contact_email = release.authors['Min'][1]
87 bug_tracker = 'http://github.com/ipython/ipython/issues'
87 bug_tracker = 'http://github.com/ipython/ipython/issues'
88 super(ParallelCrashHandler,self).__init__(
88 super(ParallelCrashHandler,self).__init__(
89 app, contact_name, contact_email, bug_tracker
89 app, contact_name, contact_email, bug_tracker
90 )
90 )
91
91
92
92
93 #-----------------------------------------------------------------------------
93 #-----------------------------------------------------------------------------
94 # Main application
94 # Main application
95 #-----------------------------------------------------------------------------
95 #-----------------------------------------------------------------------------
96 base_aliases = {}
96 base_aliases = {}
97 base_aliases.update(base_ip_aliases)
97 base_aliases.update(base_ip_aliases)
98 base_aliases.update({
98 base_aliases.update({
99 'profile_dir' : 'ProfileDir.location',
99 'profile_dir' : 'ProfileDir.location',
100 'log_level' : 'BaseParallelApplication.log_level',
101 'work_dir' : 'BaseParallelApplication.work_dir',
100 'work_dir' : 'BaseParallelApplication.work_dir',
102 'log_to_file' : 'BaseParallelApplication.log_to_file',
101 'log_to_file' : 'BaseParallelApplication.log_to_file',
103 'clean_logs' : 'BaseParallelApplication.clean_logs',
102 'clean_logs' : 'BaseParallelApplication.clean_logs',
104 'log_url' : 'BaseParallelApplication.log_url',
103 'log_url' : 'BaseParallelApplication.log_url',
105 })
104 })
106
105
107 base_flags = {
106 base_flags = {
108 'log-to-file' : (
107 'log-to-file' : (
109 {'BaseParallelApplication' : {'log_to_file' : True}},
108 {'BaseParallelApplication' : {'log_to_file' : True}},
110 "send log output to a file"
109 "send log output to a file"
111 )
110 )
112 }
111 }
113 base_flags.update(base_ip_flags)
112 base_flags.update(base_ip_flags)
114
113
115 class BaseParallelApplication(BaseIPythonApplication):
114 class BaseParallelApplication(BaseIPythonApplication):
116 """The base Application for IPython.parallel apps
115 """The base Application for IPython.parallel apps
117
116
118 Principle extensions to BaseIPyythonApplication:
117 Principle extensions to BaseIPyythonApplication:
119
118
120 * work_dir
119 * work_dir
121 * remote logging via pyzmq
120 * remote logging via pyzmq
122 * IOLoop instance
121 * IOLoop instance
123 """
122 """
124
123
125 crash_handler_class = ParallelCrashHandler
124 crash_handler_class = ParallelCrashHandler
126
125
127 def _log_level_default(self):
126 def _log_level_default(self):
128 # temporarily override default_log_level to INFO
127 # temporarily override default_log_level to INFO
129 return logging.INFO
128 return logging.INFO
130
129
131 work_dir = Unicode(os.getcwdu(), config=True,
130 work_dir = Unicode(os.getcwdu(), config=True,
132 help='Set the working dir for the process.'
131 help='Set the working dir for the process.'
133 )
132 )
134 def _work_dir_changed(self, name, old, new):
133 def _work_dir_changed(self, name, old, new):
135 self.work_dir = unicode(expand_path(new))
134 self.work_dir = unicode(expand_path(new))
136
135
137 log_to_file = Bool(config=True,
136 log_to_file = Bool(config=True,
138 help="whether to log to a file")
137 help="whether to log to a file")
139
138
140 clean_logs = Bool(False, config=True,
139 clean_logs = Bool(False, config=True,
141 help="whether to cleanup old logfiles before starting")
140 help="whether to cleanup old logfiles before starting")
142
141
143 log_url = Unicode('', config=True,
142 log_url = Unicode('', config=True,
144 help="The ZMQ URL of the iplogger to aggregate logging.")
143 help="The ZMQ URL of the iplogger to aggregate logging.")
145
144
146 def _config_files_default(self):
145 def _config_files_default(self):
147 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
146 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
148
147
149 loop = Instance('zmq.eventloop.ioloop.IOLoop')
148 loop = Instance('zmq.eventloop.ioloop.IOLoop')
150 def _loop_default(self):
149 def _loop_default(self):
151 from zmq.eventloop.ioloop import IOLoop
150 from zmq.eventloop.ioloop import IOLoop
152 return IOLoop.instance()
151 return IOLoop.instance()
153
152
154 aliases = Dict(base_aliases)
153 aliases = Dict(base_aliases)
155 flags = Dict(base_flags)
154 flags = Dict(base_flags)
156
155
157 def initialize(self, argv=None):
156 def initialize(self, argv=None):
158 """initialize the app"""
157 """initialize the app"""
159 super(BaseParallelApplication, self).initialize(argv)
158 super(BaseParallelApplication, self).initialize(argv)
160 self.to_work_dir()
159 self.to_work_dir()
161 self.reinit_logging()
160 self.reinit_logging()
162
161
163 def to_work_dir(self):
162 def to_work_dir(self):
164 wd = self.work_dir
163 wd = self.work_dir
165 if unicode(wd) != os.getcwdu():
164 if unicode(wd) != os.getcwdu():
166 os.chdir(wd)
165 os.chdir(wd)
167 self.log.info("Changing to working dir: %s" % wd)
166 self.log.info("Changing to working dir: %s" % wd)
168 # This is the working dir by now.
167 # This is the working dir by now.
169 sys.path.insert(0, '')
168 sys.path.insert(0, '')
170
169
171 def reinit_logging(self):
170 def reinit_logging(self):
172 # Remove old log files
171 # Remove old log files
173 log_dir = self.profile_dir.log_dir
172 log_dir = self.profile_dir.log_dir
174 if self.clean_logs:
173 if self.clean_logs:
175 for f in os.listdir(log_dir):
174 for f in os.listdir(log_dir):
176 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
175 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
177 os.remove(os.path.join(log_dir, f))
176 os.remove(os.path.join(log_dir, f))
178 if self.log_to_file:
177 if self.log_to_file:
179 # Start logging to the new log file
178 # Start logging to the new log file
180 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
179 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
181 logfile = os.path.join(log_dir, log_filename)
180 logfile = os.path.join(log_dir, log_filename)
182 open_log_file = open(logfile, 'w')
181 open_log_file = open(logfile, 'w')
183 else:
182 else:
184 open_log_file = None
183 open_log_file = None
185 if open_log_file is not None:
184 if open_log_file is not None:
186 self.log.removeHandler(self._log_handler)
185 self.log.removeHandler(self._log_handler)
187 self._log_handler = logging.StreamHandler(open_log_file)
186 self._log_handler = logging.StreamHandler(open_log_file)
188 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
187 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
189 self._log_handler.setFormatter(self._log_formatter)
188 self._log_handler.setFormatter(self._log_formatter)
190 self.log.addHandler(self._log_handler)
189 self.log.addHandler(self._log_handler)
191
190
192 def write_pid_file(self, overwrite=False):
191 def write_pid_file(self, overwrite=False):
193 """Create a .pid file in the pid_dir with my pid.
192 """Create a .pid file in the pid_dir with my pid.
194
193
195 This must be called after pre_construct, which sets `self.pid_dir`.
194 This must be called after pre_construct, which sets `self.pid_dir`.
196 This raises :exc:`PIDFileError` if the pid file exists already.
195 This raises :exc:`PIDFileError` if the pid file exists already.
197 """
196 """
198 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
197 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
199 if os.path.isfile(pid_file):
198 if os.path.isfile(pid_file):
200 pid = self.get_pid_from_file()
199 pid = self.get_pid_from_file()
201 if not overwrite:
200 if not overwrite:
202 raise PIDFileError(
201 raise PIDFileError(
203 'The pid file [%s] already exists. \nThis could mean that this '
202 'The pid file [%s] already exists. \nThis could mean that this '
204 'server is already running with [pid=%s].' % (pid_file, pid)
203 'server is already running with [pid=%s].' % (pid_file, pid)
205 )
204 )
206 with open(pid_file, 'w') as f:
205 with open(pid_file, 'w') as f:
207 self.log.info("Creating pid file: %s" % pid_file)
206 self.log.info("Creating pid file: %s" % pid_file)
208 f.write(repr(os.getpid())+'\n')
207 f.write(repr(os.getpid())+'\n')
209
208
210 def remove_pid_file(self):
209 def remove_pid_file(self):
211 """Remove the pid file.
210 """Remove the pid file.
212
211
213 This should be called at shutdown by registering a callback with
212 This should be called at shutdown by registering a callback with
214 :func:`reactor.addSystemEventTrigger`. This needs to return
213 :func:`reactor.addSystemEventTrigger`. This needs to return
215 ``None``.
214 ``None``.
216 """
215 """
217 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
216 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
218 if os.path.isfile(pid_file):
217 if os.path.isfile(pid_file):
219 try:
218 try:
220 self.log.info("Removing pid file: %s" % pid_file)
219 self.log.info("Removing pid file: %s" % pid_file)
221 os.remove(pid_file)
220 os.remove(pid_file)
222 except:
221 except:
223 self.log.warn("Error removing the pid file: %s" % pid_file)
222 self.log.warn("Error removing the pid file: %s" % pid_file)
224
223
225 def get_pid_from_file(self):
224 def get_pid_from_file(self):
226 """Get the pid from the pid file.
225 """Get the pid from the pid file.
227
226
228 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
227 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
229 """
228 """
230 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
229 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
231 if os.path.isfile(pid_file):
230 if os.path.isfile(pid_file):
232 with open(pid_file, 'r') as f:
231 with open(pid_file, 'r') as f:
233 s = f.read().strip()
232 s = f.read().strip()
234 try:
233 try:
235 pid = int(s)
234 pid = int(s)
236 except:
235 except:
237 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
236 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
238 return pid
237 return pid
239 else:
238 else:
240 raise PIDFileError('pid file not found: %s' % pid_file)
239 raise PIDFileError('pid file not found: %s' % pid_file)
241
240
242 def check_pid(self, pid):
241 def check_pid(self, pid):
243 if os.name == 'nt':
242 if os.name == 'nt':
244 try:
243 try:
245 import ctypes
244 import ctypes
246 # returns 0 if no such process (of ours) exists
245 # returns 0 if no such process (of ours) exists
247 # positive int otherwise
246 # positive int otherwise
248 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
247 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
249 except Exception:
248 except Exception:
250 self.log.warn(
249 self.log.warn(
251 "Could not determine whether pid %i is running via `OpenProcess`. "
250 "Could not determine whether pid %i is running via `OpenProcess`. "
252 " Making the likely assumption that it is."%pid
251 " Making the likely assumption that it is."%pid
253 )
252 )
254 return True
253 return True
255 return bool(p)
254 return bool(p)
256 else:
255 else:
257 try:
256 try:
258 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
257 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
259 output,_ = p.communicate()
258 output,_ = p.communicate()
260 except OSError:
259 except OSError:
261 self.log.warn(
260 self.log.warn(
262 "Could not determine whether pid %i is running via `ps x`. "
261 "Could not determine whether pid %i is running via `ps x`. "
263 " Making the likely assumption that it is."%pid
262 " Making the likely assumption that it is."%pid
264 )
263 )
265 return True
264 return True
266 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
265 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
267 return pid in pids
266 return pid in pids
@@ -1,458 +1,459 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag
34 from IPython.config.application import Application, boolean_flag
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.profiledir import ProfileDir
37 from IPython.core.profiledir import ProfileDir
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
40 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
41 DottedObjectName)
41 DottedObjectName)
42
42
43 from IPython.parallel.apps.baseapp import (
43 from IPython.parallel.apps.baseapp import (
44 BaseParallelApplication,
44 BaseParallelApplication,
45 PIDFileError,
45 PIDFileError,
46 base_flags, base_aliases
46 base_flags, base_aliases
47 )
47 )
48
48
49
49
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51 # Module level variables
51 # Module level variables
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53
53
54
54
55 default_config_file_name = u'ipcluster_config.py'
55 default_config_file_name = u'ipcluster_config.py'
56
56
57
57
58 _description = """Start an IPython cluster for parallel computing.
58 _description = """Start an IPython cluster for parallel computing.
59
59
60 An IPython cluster consists of 1 controller and 1 or more engines.
60 An IPython cluster consists of 1 controller and 1 or more engines.
61 This command automates the startup of these processes using a wide
61 This command automates the startup of these processes using a wide
62 range of startup methods (SSH, local processes, PBS, mpiexec,
62 range of startup methods (SSH, local processes, PBS, mpiexec,
63 Windows HPC Server 2008). To start a cluster with 4 engines on your
63 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 local host simply do 'ipcluster start n=4'. For more complex usage
64 local host simply do 'ipcluster start n=4'. For more complex usage
65 you will typically do 'ipcluster create profile=mycluster', then edit
65 you will typically do 'ipcluster create profile=mycluster', then edit
66 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
66 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
67 """
67 """
68
68
69
69
70 # Exit codes for ipcluster
70 # Exit codes for ipcluster
71
71
72 # This will be the exit code if the ipcluster appears to be running because
72 # This will be the exit code if the ipcluster appears to be running because
73 # a .pid file exists
73 # a .pid file exists
74 ALREADY_STARTED = 10
74 ALREADY_STARTED = 10
75
75
76
76
77 # This will be the exit code if ipcluster stop is run, but there is not .pid
77 # This will be the exit code if ipcluster stop is run, but there is not .pid
78 # file to be found.
78 # file to be found.
79 ALREADY_STOPPED = 11
79 ALREADY_STOPPED = 11
80
80
81 # This will be the exit code if ipcluster engines is run, but there is not .pid
81 # This will be the exit code if ipcluster engines is run, but there is not .pid
82 # file to be found.
82 # file to be found.
83 NO_CLUSTER = 12
83 NO_CLUSTER = 12
84
84
85
85
86 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
87 # Main application
87 # Main application
88 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
89 start_help = """Start an IPython cluster for parallel computing
89 start_help = """Start an IPython cluster for parallel computing
90
90
91 Start an ipython cluster by its profile name or cluster
91 Start an ipython cluster by its profile name or cluster
92 directory. Cluster directories contain configuration, log and
92 directory. Cluster directories contain configuration, log and
93 security related files and are named using the convention
93 security related files and are named using the convention
94 'profile_<name>' and should be creating using the 'start'
94 'profile_<name>' and should be creating using the 'start'
95 subcommand of 'ipcluster'. If your cluster directory is in
95 subcommand of 'ipcluster'. If your cluster directory is in
96 the cwd or the ipython directory, you can simply refer to it
96 the cwd or the ipython directory, you can simply refer to it
97 using its profile name, 'ipcluster start n=4 profile=<profile>`,
97 using its profile name, 'ipcluster start n=4 profile=<profile>`,
98 otherwise use the 'profile_dir' option.
98 otherwise use the 'profile_dir' option.
99 """
99 """
100 stop_help = """Stop a running IPython cluster
100 stop_help = """Stop a running IPython cluster
101
101
102 Stop a running ipython cluster by its profile name or cluster
102 Stop a running ipython cluster by its profile name or cluster
103 directory. Cluster directories are named using the convention
103 directory. Cluster directories are named using the convention
104 'profile_<name>'. If your cluster directory is in
104 'profile_<name>'. If your cluster directory is in
105 the cwd or the ipython directory, you can simply refer to it
105 the cwd or the ipython directory, you can simply refer to it
106 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
106 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
107 use the 'profile_dir' option.
107 use the 'profile_dir' option.
108 """
108 """
109 engines_help = """Start engines connected to an existing IPython cluster
109 engines_help = """Start engines connected to an existing IPython cluster
110
110
111 Start one or more engines to connect to an existing Cluster
111 Start one or more engines to connect to an existing Cluster
112 by profile name or cluster directory.
112 by profile name or cluster directory.
113 Cluster directories contain configuration, log and
113 Cluster directories contain configuration, log and
114 security related files and are named using the convention
114 security related files and are named using the convention
115 'profile_<name>' and should be creating using the 'start'
115 'profile_<name>' and should be creating using the 'start'
116 subcommand of 'ipcluster'. If your cluster directory is in
116 subcommand of 'ipcluster'. If your cluster directory is in
117 the cwd or the ipython directory, you can simply refer to it
117 the cwd or the ipython directory, you can simply refer to it
118 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
118 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
119 otherwise use the 'profile_dir' option.
119 otherwise use the 'profile_dir' option.
120 """
120 """
121 stop_aliases = dict(
121 stop_aliases = dict(
122 signal='IPClusterStop.signal',
122 signal='IPClusterStop.signal',
123 )
123 )
124 stop_aliases.update(base_aliases)
124 stop_aliases.update(base_aliases)
125
125
126 class IPClusterStop(BaseParallelApplication):
126 class IPClusterStop(BaseParallelApplication):
127 name = u'ipcluster'
127 name = u'ipcluster'
128 description = stop_help
128 description = stop_help
129 config_file_name = Unicode(default_config_file_name)
129 config_file_name = Unicode(default_config_file_name)
130
130
131 signal = Int(signal.SIGINT, config=True,
131 signal = Int(signal.SIGINT, config=True,
132 help="signal to use for stopping processes.")
132 help="signal to use for stopping processes.")
133
133
134 aliases = Dict(stop_aliases)
134 aliases = Dict(stop_aliases)
135
135
136 def start(self):
136 def start(self):
137 """Start the app for the stop subcommand."""
137 """Start the app for the stop subcommand."""
138 try:
138 try:
139 pid = self.get_pid_from_file()
139 pid = self.get_pid_from_file()
140 except PIDFileError:
140 except PIDFileError:
141 self.log.critical(
141 self.log.critical(
142 'Could not read pid file, cluster is probably not running.'
142 'Could not read pid file, cluster is probably not running.'
143 )
143 )
144 # Here I exit with a unusual exit status that other processes
144 # Here I exit with a unusual exit status that other processes
145 # can watch for to learn how I existed.
145 # can watch for to learn how I existed.
146 self.remove_pid_file()
146 self.remove_pid_file()
147 self.exit(ALREADY_STOPPED)
147 self.exit(ALREADY_STOPPED)
148
148
149 if not self.check_pid(pid):
149 if not self.check_pid(pid):
150 self.log.critical(
150 self.log.critical(
151 'Cluster [pid=%r] is not running.' % pid
151 'Cluster [pid=%r] is not running.' % pid
152 )
152 )
153 self.remove_pid_file()
153 self.remove_pid_file()
154 # Here I exit with a unusual exit status that other processes
154 # Here I exit with a unusual exit status that other processes
155 # can watch for to learn how I existed.
155 # can watch for to learn how I existed.
156 self.exit(ALREADY_STOPPED)
156 self.exit(ALREADY_STOPPED)
157
157
158 elif os.name=='posix':
158 elif os.name=='posix':
159 sig = self.signal
159 sig = self.signal
160 self.log.info(
160 self.log.info(
161 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
161 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
162 )
162 )
163 try:
163 try:
164 os.kill(pid, sig)
164 os.kill(pid, sig)
165 except OSError:
165 except OSError:
166 self.log.error("Stopping cluster failed, assuming already dead.",
166 self.log.error("Stopping cluster failed, assuming already dead.",
167 exc_info=True)
167 exc_info=True)
168 self.remove_pid_file()
168 self.remove_pid_file()
169 elif os.name=='nt':
169 elif os.name=='nt':
170 try:
170 try:
171 # kill the whole tree
171 # kill the whole tree
172 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
172 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
173 except (CalledProcessError, OSError):
173 except (CalledProcessError, OSError):
174 self.log.error("Stopping cluster failed, assuming already dead.",
174 self.log.error("Stopping cluster failed, assuming already dead.",
175 exc_info=True)
175 exc_info=True)
176 self.remove_pid_file()
176 self.remove_pid_file()
177
177
178 engine_aliases = {}
178 engine_aliases = {}
179 engine_aliases.update(base_aliases)
179 engine_aliases.update(base_aliases)
180 engine_aliases.update(dict(
180 engine_aliases.update(dict(
181 n='IPClusterEngines.n',
181 n='IPClusterEngines.n',
182 elauncher = 'IPClusterEngines.engine_launcher_class',
182 engines = 'IPClusterEngines.engine_launcher_class',
183 daemonize = 'IPClusterEngines.daemonize',
183 daemonize = 'IPClusterEngines.daemonize',
184 ))
184 ))
185 engine_flags = {}
185 engine_flags = {}
186 engine_flags.update(base_flags)
186 engine_flags.update(base_flags)
187
187
188 engine_flags.update(dict(
188 engine_flags.update(dict(
189 daemonize=(
189 daemonize=(
190 {'IPClusterEngines' : {'daemonize' : True}},
190 {'IPClusterEngines' : {'daemonize' : True}},
191 """run the cluster into the background (not available on Windows)""",
191 """run the cluster into the background (not available on Windows)""",
192 )
192 )
193 ))
193 ))
194 class IPClusterEngines(BaseParallelApplication):
194 class IPClusterEngines(BaseParallelApplication):
195
195
196 name = u'ipcluster'
196 name = u'ipcluster'
197 description = engines_help
197 description = engines_help
198 usage = None
198 usage = None
199 config_file_name = Unicode(default_config_file_name)
199 config_file_name = Unicode(default_config_file_name)
200 default_log_level = logging.INFO
200 default_log_level = logging.INFO
201 classes = List()
201 classes = List()
202 def _classes_default(self):
202 def _classes_default(self):
203 from IPython.parallel.apps import launcher
203 from IPython.parallel.apps import launcher
204 launchers = launcher.all_launchers
204 launchers = launcher.all_launchers
205 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
205 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
206 return [ProfileDir]+eslaunchers
206 return [ProfileDir]+eslaunchers
207
207
208 n = Int(2, config=True,
208 n = Int(2, config=True,
209 help="The number of engines to start.")
209 help="The number of engines to start.")
210
210
211 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
211 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
212 config=True,
212 config=True,
213 help="The class for launching a set of Engines."
213 help="The class for launching a set of Engines."
214 )
214 )
215 daemonize = Bool(False, config=True,
215 daemonize = Bool(False, config=True,
216 help="""Daemonize the ipcluster program. This implies --log-to-file.
216 help="""Daemonize the ipcluster program. This implies --log-to-file.
217 Not available on Windows.
217 Not available on Windows.
218 """)
218 """)
219
219
220 def _daemonize_changed(self, name, old, new):
220 def _daemonize_changed(self, name, old, new):
221 if new:
221 if new:
222 self.log_to_file = True
222 self.log_to_file = True
223
223
224 aliases = Dict(engine_aliases)
224 aliases = Dict(engine_aliases)
225 flags = Dict(engine_flags)
225 flags = Dict(engine_flags)
226 _stopping = False
226 _stopping = False
227
227
228 def initialize(self, argv=None):
228 def initialize(self, argv=None):
229 super(IPClusterEngines, self).initialize(argv)
229 super(IPClusterEngines, self).initialize(argv)
230 self.init_signal()
230 self.init_signal()
231 self.init_launchers()
231 self.init_launchers()
232
232
233 def init_launchers(self):
233 def init_launchers(self):
234 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
234 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
235 self.engine_launcher.on_stop(lambda r: self.loop.stop())
235 self.engine_launcher.on_stop(lambda r: self.loop.stop())
236
236
237 def init_signal(self):
237 def init_signal(self):
238 # Setup signals
238 # Setup signals
239 signal.signal(signal.SIGINT, self.sigint_handler)
239 signal.signal(signal.SIGINT, self.sigint_handler)
240
240
241 def build_launcher(self, clsname):
241 def build_launcher(self, clsname):
242 """import and instantiate a Launcher based on importstring"""
242 """import and instantiate a Launcher based on importstring"""
243 if '.' not in clsname:
243 if '.' not in clsname:
244 # not a module, presume it's the raw name in apps.launcher
244 # not a module, presume it's the raw name in apps.launcher
245 clsname = 'IPython.parallel.apps.launcher.'+clsname
245 clsname = 'IPython.parallel.apps.launcher.'+clsname
246 # print repr(clsname)
246 # print repr(clsname)
247 klass = import_item(clsname)
247 klass = import_item(clsname)
248
248
249 launcher = klass(
249 launcher = klass(
250 work_dir=self.profile_dir.location, config=self.config, log=self.log
250 work_dir=self.profile_dir.location, config=self.config, log=self.log
251 )
251 )
252 return launcher
252 return launcher
253
253
254 def start_engines(self):
254 def start_engines(self):
255 self.log.info("Starting %i engines"%self.n)
255 self.log.info("Starting %i engines"%self.n)
256 self.engine_launcher.start(
256 self.engine_launcher.start(
257 self.n,
257 self.n,
258 self.profile_dir.location
258 self.profile_dir.location
259 )
259 )
260
260
261 def stop_engines(self):
261 def stop_engines(self):
262 self.log.info("Stopping Engines...")
262 self.log.info("Stopping Engines...")
263 if self.engine_launcher.running:
263 if self.engine_launcher.running:
264 d = self.engine_launcher.stop()
264 d = self.engine_launcher.stop()
265 return d
265 return d
266 else:
266 else:
267 return None
267 return None
268
268
269 def stop_launchers(self, r=None):
269 def stop_launchers(self, r=None):
270 if not self._stopping:
270 if not self._stopping:
271 self._stopping = True
271 self._stopping = True
272 self.log.error("IPython cluster: stopping")
272 self.log.error("IPython cluster: stopping")
273 self.stop_engines()
273 self.stop_engines()
274 # Wait a few seconds to let things shut down.
274 # Wait a few seconds to let things shut down.
275 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
275 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
276 dc.start()
276 dc.start()
277
277
278 def sigint_handler(self, signum, frame):
278 def sigint_handler(self, signum, frame):
279 self.log.debug("SIGINT received, stopping launchers...")
279 self.log.debug("SIGINT received, stopping launchers...")
280 self.stop_launchers()
280 self.stop_launchers()
281
281
282 def start_logging(self):
282 def start_logging(self):
283 # Remove old log files of the controller and engine
283 # Remove old log files of the controller and engine
284 if self.clean_logs:
284 if self.clean_logs:
285 log_dir = self.profile_dir.log_dir
285 log_dir = self.profile_dir.log_dir
286 for f in os.listdir(log_dir):
286 for f in os.listdir(log_dir):
287 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
287 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
288 os.remove(os.path.join(log_dir, f))
288 os.remove(os.path.join(log_dir, f))
289 # This will remove old log files for ipcluster itself
289 # This will remove old log files for ipcluster itself
290 # super(IPBaseParallelApplication, self).start_logging()
290 # super(IPBaseParallelApplication, self).start_logging()
291
291
292 def start(self):
292 def start(self):
293 """Start the app for the engines subcommand."""
293 """Start the app for the engines subcommand."""
294 self.log.info("IPython cluster: started")
294 self.log.info("IPython cluster: started")
295 # First see if the cluster is already running
295 # First see if the cluster is already running
296
296
297 # Now log and daemonize
297 # Now log and daemonize
298 self.log.info(
298 self.log.info(
299 'Starting engines with [daemon=%r]' % self.daemonize
299 'Starting engines with [daemon=%r]' % self.daemonize
300 )
300 )
301 # TODO: Get daemonize working on Windows or as a Windows Server.
301 # TODO: Get daemonize working on Windows or as a Windows Server.
302 if self.daemonize:
302 if self.daemonize:
303 if os.name=='posix':
303 if os.name=='posix':
304 daemonize()
304 daemonize()
305
305
306 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
306 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
307 dc.start()
307 dc.start()
308 # Now write the new pid file AFTER our new forked pid is active.
308 # Now write the new pid file AFTER our new forked pid is active.
309 # self.write_pid_file()
309 # self.write_pid_file()
310 try:
310 try:
311 self.loop.start()
311 self.loop.start()
312 except KeyboardInterrupt:
312 except KeyboardInterrupt:
313 pass
313 pass
314 except zmq.ZMQError as e:
314 except zmq.ZMQError as e:
315 if e.errno == errno.EINTR:
315 if e.errno == errno.EINTR:
316 pass
316 pass
317 else:
317 else:
318 raise
318 raise
319
319
320 start_aliases = {}
320 start_aliases = {}
321 start_aliases.update(engine_aliases)
321 start_aliases.update(engine_aliases)
322 start_aliases.update(dict(
322 start_aliases.update(dict(
323 delay='IPClusterStart.delay',
323 delay='IPClusterStart.delay',
324 clean_logs='IPClusterStart.clean_logs',
324 clean_logs='IPClusterStart.clean_logs',
325 controller = 'IPClusterStart.controller_launcher_class',
325 ))
326 ))
326
327
327 class IPClusterStart(IPClusterEngines):
328 class IPClusterStart(IPClusterEngines):
328
329
329 name = u'ipcluster'
330 name = u'ipcluster'
330 description = start_help
331 description = start_help
331 default_log_level = logging.INFO
332 default_log_level = logging.INFO
332 auto_create = Bool(True, config=True,
333 auto_create = Bool(True, config=True,
333 help="whether to create the profile_dir if it doesn't exist")
334 help="whether to create the profile_dir if it doesn't exist")
334 classes = List()
335 classes = List()
335 def _classes_default(self,):
336 def _classes_default(self,):
336 from IPython.parallel.apps import launcher
337 from IPython.parallel.apps import launcher
337 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
338 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
338
339
339 clean_logs = Bool(True, config=True,
340 clean_logs = Bool(True, config=True,
340 help="whether to cleanup old logs before starting")
341 help="whether to cleanup old logs before starting")
341
342
342 delay = CFloat(1., config=True,
343 delay = CFloat(1., config=True,
343 help="delay (in s) between starting the controller and the engines")
344 help="delay (in s) between starting the controller and the engines")
344
345
345 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
346 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
346 config=True,
347 config=True,
347 help="The class for launching a Controller."
348 help="The class for launching a Controller."
348 )
349 )
349 reset = Bool(False, config=True,
350 reset = Bool(False, config=True,
350 help="Whether to reset config files as part of '--create'."
351 help="Whether to reset config files as part of '--create'."
351 )
352 )
352
353
353 # flags = Dict(flags)
354 # flags = Dict(flags)
354 aliases = Dict(start_aliases)
355 aliases = Dict(start_aliases)
355
356
356 def init_launchers(self):
357 def init_launchers(self):
357 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
358 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
358 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
359 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
359 self.controller_launcher.on_stop(self.stop_launchers)
360 self.controller_launcher.on_stop(self.stop_launchers)
360
361
361 def start_controller(self):
362 def start_controller(self):
362 self.controller_launcher.start(
363 self.controller_launcher.start(
363 self.profile_dir.location
364 self.profile_dir.location
364 )
365 )
365
366
366 def stop_controller(self):
367 def stop_controller(self):
367 # self.log.info("In stop_controller")
368 # self.log.info("In stop_controller")
368 if self.controller_launcher and self.controller_launcher.running:
369 if self.controller_launcher and self.controller_launcher.running:
369 return self.controller_launcher.stop()
370 return self.controller_launcher.stop()
370
371
371 def stop_launchers(self, r=None):
372 def stop_launchers(self, r=None):
372 if not self._stopping:
373 if not self._stopping:
373 self.stop_controller()
374 self.stop_controller()
374 super(IPClusterStart, self).stop_launchers()
375 super(IPClusterStart, self).stop_launchers()
375
376
376 def start(self):
377 def start(self):
377 """Start the app for the start subcommand."""
378 """Start the app for the start subcommand."""
378 # First see if the cluster is already running
379 # First see if the cluster is already running
379 try:
380 try:
380 pid = self.get_pid_from_file()
381 pid = self.get_pid_from_file()
381 except PIDFileError:
382 except PIDFileError:
382 pass
383 pass
383 else:
384 else:
384 if self.check_pid(pid):
385 if self.check_pid(pid):
385 self.log.critical(
386 self.log.critical(
386 'Cluster is already running with [pid=%s]. '
387 'Cluster is already running with [pid=%s]. '
387 'use "ipcluster stop" to stop the cluster.' % pid
388 'use "ipcluster stop" to stop the cluster.' % pid
388 )
389 )
389 # Here I exit with a unusual exit status that other processes
390 # Here I exit with a unusual exit status that other processes
390 # can watch for to learn how I existed.
391 # can watch for to learn how I existed.
391 self.exit(ALREADY_STARTED)
392 self.exit(ALREADY_STARTED)
392 else:
393 else:
393 self.remove_pid_file()
394 self.remove_pid_file()
394
395
395
396
396 # Now log and daemonize
397 # Now log and daemonize
397 self.log.info(
398 self.log.info(
398 'Starting ipcluster with [daemon=%r]' % self.daemonize
399 'Starting ipcluster with [daemon=%r]' % self.daemonize
399 )
400 )
400 # TODO: Get daemonize working on Windows or as a Windows Server.
401 # TODO: Get daemonize working on Windows or as a Windows Server.
401 if self.daemonize:
402 if self.daemonize:
402 if os.name=='posix':
403 if os.name=='posix':
403 daemonize()
404 daemonize()
404
405
405 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
406 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
406 dc.start()
407 dc.start()
407 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
408 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
408 dc.start()
409 dc.start()
409 # Now write the new pid file AFTER our new forked pid is active.
410 # Now write the new pid file AFTER our new forked pid is active.
410 self.write_pid_file()
411 self.write_pid_file()
411 try:
412 try:
412 self.loop.start()
413 self.loop.start()
413 except KeyboardInterrupt:
414 except KeyboardInterrupt:
414 pass
415 pass
415 except zmq.ZMQError as e:
416 except zmq.ZMQError as e:
416 if e.errno == errno.EINTR:
417 if e.errno == errno.EINTR:
417 pass
418 pass
418 else:
419 else:
419 raise
420 raise
420 finally:
421 finally:
421 self.remove_pid_file()
422 self.remove_pid_file()
422
423
423 base='IPython.parallel.apps.ipclusterapp.IPCluster'
424 base='IPython.parallel.apps.ipclusterapp.IPCluster'
424
425
425 class IPClusterApp(Application):
426 class IPClusterApp(Application):
426 name = u'ipcluster'
427 name = u'ipcluster'
427 description = _description
428 description = _description
428
429
429 subcommands = {
430 subcommands = {
430 'start' : (base+'Start', start_help),
431 'start' : (base+'Start', start_help),
431 'stop' : (base+'Stop', stop_help),
432 'stop' : (base+'Stop', stop_help),
432 'engines' : (base+'Engines', engines_help),
433 'engines' : (base+'Engines', engines_help),
433 }
434 }
434
435
435 # no aliases or flags for parent App
436 # no aliases or flags for parent App
436 aliases = Dict()
437 aliases = Dict()
437 flags = Dict()
438 flags = Dict()
438
439
439 def start(self):
440 def start(self):
440 if self.subapp is None:
441 if self.subapp is None:
441 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
442 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
442 print
443 print
443 self.print_description()
444 self.print_description()
444 self.print_subcommands()
445 self.print_subcommands()
445 self.exit(1)
446 self.exit(1)
446 else:
447 else:
447 return self.subapp.start()
448 return self.subapp.start()
448
449
449 def launch_new_instance():
450 def launch_new_instance():
450 """Create and run the IPython cluster."""
451 """Create and run the IPython cluster."""
451 app = IPClusterApp.instance()
452 app = IPClusterApp.instance()
452 app.initialize()
453 app.initialize()
453 app.start()
454 app.start()
454
455
455
456
456 if __name__ == '__main__':
457 if __name__ == '__main__':
457 launch_new_instance()
458 launch_new_instance()
458
459
@@ -1,428 +1,423 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import os
26 import os
27 import socket
27 import socket
28 import stat
28 import stat
29 import sys
29 import sys
30 import uuid
30 import uuid
31
31
32 from multiprocessing import Process
32 from multiprocessing import Process
33
33
34 import zmq
34 import zmq
35 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
36 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
37 from zmq.utils import jsonapi as json
37 from zmq.utils import jsonapi as json
38
38
39 from IPython.config.application import boolean_flag
39 from IPython.config.application import boolean_flag
40 from IPython.core.profiledir import ProfileDir
40 from IPython.core.profiledir import ProfileDir
41
41
42 from IPython.parallel.apps.baseapp import (
42 from IPython.parallel.apps.baseapp import (
43 BaseParallelApplication,
43 BaseParallelApplication,
44 base_flags
44 base_aliases,
45 base_flags,
45 )
46 )
46 from IPython.utils.importstring import import_item
47 from IPython.utils.importstring import import_item
47 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
48
49
49 # from IPython.parallel.controller.controller import ControllerFactory
50 # from IPython.parallel.controller.controller import ControllerFactory
50 from IPython.zmq.session import Session
51 from IPython.zmq.session import Session
51 from IPython.parallel.controller.heartmonitor import HeartMonitor
52 from IPython.parallel.controller.heartmonitor import HeartMonitor
52 from IPython.parallel.controller.hub import HubFactory
53 from IPython.parallel.controller.hub import HubFactory
53 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
54 from IPython.parallel.controller.sqlitedb import SQLiteDB
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
55
56
56 from IPython.parallel.util import signal_children, split_url
57 from IPython.parallel.util import signal_children, split_url
57
58
58 # conditional import of MongoDB backend class
59 # conditional import of MongoDB backend class
59
60
60 try:
61 try:
61 from IPython.parallel.controller.mongodb import MongoDB
62 from IPython.parallel.controller.mongodb import MongoDB
62 except ImportError:
63 except ImportError:
63 maybe_mongo = []
64 maybe_mongo = []
64 else:
65 else:
65 maybe_mongo = [MongoDB]
66 maybe_mongo = [MongoDB]
66
67
67
68
68 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
69 # Module level variables
70 # Module level variables
70 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
71
72
72
73
73 #: The default config file name for this application
74 #: The default config file name for this application
74 default_config_file_name = u'ipcontroller_config.py'
75 default_config_file_name = u'ipcontroller_config.py'
75
76
76
77
77 _description = """Start the IPython controller for parallel computing.
78 _description = """Start the IPython controller for parallel computing.
78
79
79 The IPython controller provides a gateway between the IPython engines and
80 The IPython controller provides a gateway between the IPython engines and
80 clients. The controller needs to be started before the engines and can be
81 clients. The controller needs to be started before the engines and can be
81 configured using command line options or using a cluster directory. Cluster
82 configured using command line options or using a cluster directory. Cluster
82 directories contain config, log and security files and are usually located in
83 directories contain config, log and security files and are usually located in
83 your ipython directory and named as "profile_name". See the `profile`
84 your ipython directory and named as "profile_name". See the `profile`
84 and `profile_dir` options for details.
85 and `profile_dir` options for details.
85 """
86 """
86
87
87
88
88
89
89
90
90 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
91 # The main application
92 # The main application
92 #-----------------------------------------------------------------------------
93 #-----------------------------------------------------------------------------
93 flags = {}
94 flags = {}
94 flags.update(base_flags)
95 flags.update(base_flags)
95 flags.update({
96 flags.update({
96 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
97 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
97 'Use threads instead of processes for the schedulers'),
98 'Use threads instead of processes for the schedulers'),
98 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
99 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
99 'use the SQLiteDB backend'),
100 'use the SQLiteDB backend'),
100 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
101 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
101 'use the MongoDB backend'),
102 'use the MongoDB backend'),
102 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
103 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
103 'use the in-memory DictDB backend'),
104 'use the in-memory DictDB backend'),
104 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
105 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
105 'reuse existing json connection files')
106 'reuse existing json connection files')
106 })
107 })
107
108
108 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
109 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
109 "Use HMAC digests for authentication of messages.",
110 "Use HMAC digests for authentication of messages.",
110 "Don't authenticate messages."
111 "Don't authenticate messages."
111 ))
112 ))
113 aliases = dict(
114 reuse_files = 'IPControllerApp.reuse_files',
115 secure = 'IPControllerApp.secure',
116 ssh = 'IPControllerApp.ssh_server',
117 use_threads = 'IPControllerApp.use_threads',
118 location = 'IPControllerApp.location',
119
120 ident = 'Session.session',
121 user = 'Session.username',
122 exec_key = 'Session.keyfile',
123
124 url = 'HubFactory.url',
125 ip = 'HubFactory.ip',
126 transport = 'HubFactory.transport',
127 port = 'HubFactory.regport',
128
129 ping = 'HeartMonitor.period',
130
131 scheme = 'TaskScheduler.scheme_name',
132 hwm = 'TaskScheduler.hwm',
133 )
134 aliases.update(base_aliases)
112
135
113 class IPControllerApp(BaseParallelApplication):
136 class IPControllerApp(BaseParallelApplication):
114
137
115 name = u'ipcontroller'
138 name = u'ipcontroller'
116 description = _description
139 description = _description
117 config_file_name = Unicode(default_config_file_name)
140 config_file_name = Unicode(default_config_file_name)
118 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
141 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
119
142
120 # change default to True
143 # change default to True
121 auto_create = Bool(True, config=True,
144 auto_create = Bool(True, config=True,
122 help="""Whether to create profile dir if it doesn't exist.""")
145 help="""Whether to create profile dir if it doesn't exist.""")
123
146
124 reuse_files = Bool(False, config=True,
147 reuse_files = Bool(False, config=True,
125 help='Whether to reuse existing json connection files.'
148 help='Whether to reuse existing json connection files.'
126 )
149 )
127 secure = Bool(True, config=True,
150 secure = Bool(True, config=True,
128 help='Whether to use HMAC digests for extra message authentication.'
151 help='Whether to use HMAC digests for extra message authentication.'
129 )
152 )
130 ssh_server = Unicode(u'', config=True,
153 ssh_server = Unicode(u'', config=True,
131 help="""ssh url for clients to use when connecting to the Controller
154 help="""ssh url for clients to use when connecting to the Controller
132 processes. It should be of the form: [user@]server[:port]. The
155 processes. It should be of the form: [user@]server[:port]. The
133 Controller's listening addresses must be accessible from the ssh server""",
156 Controller's listening addresses must be accessible from the ssh server""",
134 )
157 )
135 location = Unicode(u'', config=True,
158 location = Unicode(u'', config=True,
136 help="""The external IP or domain name of the Controller, used for disambiguating
159 help="""The external IP or domain name of the Controller, used for disambiguating
137 engine and client connections.""",
160 engine and client connections.""",
138 )
161 )
139 import_statements = List([], config=True,
162 import_statements = List([], config=True,
140 help="import statements to be run at startup. Necessary in some environments"
163 help="import statements to be run at startup. Necessary in some environments"
141 )
164 )
142
165
143 use_threads = Bool(False, config=True,
166 use_threads = Bool(False, config=True,
144 help='Use threads instead of processes for the schedulers',
167 help='Use threads instead of processes for the schedulers',
145 )
168 )
146
169
147 # internal
170 # internal
148 children = List()
171 children = List()
149 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
172 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
150
173
151 def _use_threads_changed(self, name, old, new):
174 def _use_threads_changed(self, name, old, new):
152 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
175 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
153
176
154 aliases = Dict(dict(
177 aliases = Dict(aliases)
155 log_level = 'IPControllerApp.log_level',
156 log_url = 'IPControllerApp.log_url',
157 reuse_files = 'IPControllerApp.reuse_files',
158 secure = 'IPControllerApp.secure',
159 ssh = 'IPControllerApp.ssh_server',
160 use_threads = 'IPControllerApp.use_threads',
161 import_statements = 'IPControllerApp.import_statements',
162 location = 'IPControllerApp.location',
163
164 ident = 'Session.session',
165 user = 'Session.username',
166 exec_key = 'Session.keyfile',
167
168 url = 'HubFactory.url',
169 ip = 'HubFactory.ip',
170 transport = 'HubFactory.transport',
171 port = 'HubFactory.regport',
172
173 ping = 'HeartMonitor.period',
174
175 scheme = 'TaskScheduler.scheme_name',
176 hwm = 'TaskScheduler.hwm',
177
178
179 profile = "BaseIPythonApplication.profile",
180 profile_dir = 'ProfileDir.location',
181
182 ))
183 flags = Dict(flags)
178 flags = Dict(flags)
184
179
185
180
186 def save_connection_dict(self, fname, cdict):
181 def save_connection_dict(self, fname, cdict):
187 """save a connection dict to json file."""
182 """save a connection dict to json file."""
188 c = self.config
183 c = self.config
189 url = cdict['url']
184 url = cdict['url']
190 location = cdict['location']
185 location = cdict['location']
191 if not location:
186 if not location:
192 try:
187 try:
193 proto,ip,port = split_url(url)
188 proto,ip,port = split_url(url)
194 except AssertionError:
189 except AssertionError:
195 pass
190 pass
196 else:
191 else:
197 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
192 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
198 cdict['location'] = location
193 cdict['location'] = location
199 fname = os.path.join(self.profile_dir.security_dir, fname)
194 fname = os.path.join(self.profile_dir.security_dir, fname)
200 with open(fname, 'wb') as f:
195 with open(fname, 'wb') as f:
201 f.write(json.dumps(cdict, indent=2))
196 f.write(json.dumps(cdict, indent=2))
202 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
197 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
203
198
204 def load_config_from_json(self):
199 def load_config_from_json(self):
205 """load config from existing json connector files."""
200 """load config from existing json connector files."""
206 c = self.config
201 c = self.config
207 # load from engine config
202 # load from engine config
208 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
203 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
209 cfg = json.loads(f.read())
204 cfg = json.loads(f.read())
210 key = c.Session.key = cfg['exec_key']
205 key = c.Session.key = cfg['exec_key']
211 xport,addr = cfg['url'].split('://')
206 xport,addr = cfg['url'].split('://')
212 c.HubFactory.engine_transport = xport
207 c.HubFactory.engine_transport = xport
213 ip,ports = addr.split(':')
208 ip,ports = addr.split(':')
214 c.HubFactory.engine_ip = ip
209 c.HubFactory.engine_ip = ip
215 c.HubFactory.regport = int(ports)
210 c.HubFactory.regport = int(ports)
216 self.location = cfg['location']
211 self.location = cfg['location']
217
212
218 # load client config
213 # load client config
219 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
214 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
220 cfg = json.loads(f.read())
215 cfg = json.loads(f.read())
221 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
216 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
222 xport,addr = cfg['url'].split('://')
217 xport,addr = cfg['url'].split('://')
223 c.HubFactory.client_transport = xport
218 c.HubFactory.client_transport = xport
224 ip,ports = addr.split(':')
219 ip,ports = addr.split(':')
225 c.HubFactory.client_ip = ip
220 c.HubFactory.client_ip = ip
226 self.ssh_server = cfg['ssh']
221 self.ssh_server = cfg['ssh']
227 assert int(ports) == c.HubFactory.regport, "regport mismatch"
222 assert int(ports) == c.HubFactory.regport, "regport mismatch"
228
223
229 def init_hub(self):
224 def init_hub(self):
230 c = self.config
225 c = self.config
231
226
232 self.do_import_statements()
227 self.do_import_statements()
233 reusing = self.reuse_files
228 reusing = self.reuse_files
234 if reusing:
229 if reusing:
235 try:
230 try:
236 self.load_config_from_json()
231 self.load_config_from_json()
237 except (AssertionError,IOError):
232 except (AssertionError,IOError):
238 reusing=False
233 reusing=False
239 # check again, because reusing may have failed:
234 # check again, because reusing may have failed:
240 if reusing:
235 if reusing:
241 pass
236 pass
242 elif self.secure:
237 elif self.secure:
243 key = str(uuid.uuid4())
238 key = str(uuid.uuid4())
244 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
239 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
245 # with open(keyfile, 'w') as f:
240 # with open(keyfile, 'w') as f:
246 # f.write(key)
241 # f.write(key)
247 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
248 c.Session.key = key
243 c.Session.key = key
249 else:
244 else:
250 key = c.Session.key = ''
245 key = c.Session.key = ''
251
246
252 try:
247 try:
253 self.factory = HubFactory(config=c, log=self.log)
248 self.factory = HubFactory(config=c, log=self.log)
254 # self.start_logging()
249 # self.start_logging()
255 self.factory.init_hub()
250 self.factory.init_hub()
256 except:
251 except:
257 self.log.error("Couldn't construct the Controller", exc_info=True)
252 self.log.error("Couldn't construct the Controller", exc_info=True)
258 self.exit(1)
253 self.exit(1)
259
254
260 if not reusing:
255 if not reusing:
261 # save to new json config files
256 # save to new json config files
262 f = self.factory
257 f = self.factory
263 cdict = {'exec_key' : key,
258 cdict = {'exec_key' : key,
264 'ssh' : self.ssh_server,
259 'ssh' : self.ssh_server,
265 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
260 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
266 'location' : self.location
261 'location' : self.location
267 }
262 }
268 self.save_connection_dict('ipcontroller-client.json', cdict)
263 self.save_connection_dict('ipcontroller-client.json', cdict)
269 edict = cdict
264 edict = cdict
270 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
265 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
271 self.save_connection_dict('ipcontroller-engine.json', edict)
266 self.save_connection_dict('ipcontroller-engine.json', edict)
272
267
273 #
268 #
274 def init_schedulers(self):
269 def init_schedulers(self):
275 children = self.children
270 children = self.children
276 mq = import_item(str(self.mq_class))
271 mq = import_item(str(self.mq_class))
277
272
278 hub = self.factory
273 hub = self.factory
279 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
274 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
280 # IOPub relay (in a Process)
275 # IOPub relay (in a Process)
281 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
282 q.bind_in(hub.client_info['iopub'])
277 q.bind_in(hub.client_info['iopub'])
283 q.bind_out(hub.engine_info['iopub'])
278 q.bind_out(hub.engine_info['iopub'])
284 q.setsockopt_out(zmq.SUBSCRIBE, '')
279 q.setsockopt_out(zmq.SUBSCRIBE, '')
285 q.connect_mon(hub.monitor_url)
280 q.connect_mon(hub.monitor_url)
286 q.daemon=True
281 q.daemon=True
287 children.append(q)
282 children.append(q)
288
283
289 # Multiplexer Queue (in a Process)
284 # Multiplexer Queue (in a Process)
290 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
291 q.bind_in(hub.client_info['mux'])
286 q.bind_in(hub.client_info['mux'])
292 q.setsockopt_in(zmq.IDENTITY, 'mux')
287 q.setsockopt_in(zmq.IDENTITY, 'mux')
293 q.bind_out(hub.engine_info['mux'])
288 q.bind_out(hub.engine_info['mux'])
294 q.connect_mon(hub.monitor_url)
289 q.connect_mon(hub.monitor_url)
295 q.daemon=True
290 q.daemon=True
296 children.append(q)
291 children.append(q)
297
292
298 # Control Queue (in a Process)
293 # Control Queue (in a Process)
299 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
300 q.bind_in(hub.client_info['control'])
295 q.bind_in(hub.client_info['control'])
301 q.setsockopt_in(zmq.IDENTITY, 'control')
296 q.setsockopt_in(zmq.IDENTITY, 'control')
302 q.bind_out(hub.engine_info['control'])
297 q.bind_out(hub.engine_info['control'])
303 q.connect_mon(hub.monitor_url)
298 q.connect_mon(hub.monitor_url)
304 q.daemon=True
299 q.daemon=True
305 children.append(q)
300 children.append(q)
306 try:
301 try:
307 scheme = self.config.TaskScheduler.scheme_name
302 scheme = self.config.TaskScheduler.scheme_name
308 except AttributeError:
303 except AttributeError:
309 scheme = TaskScheduler.scheme_name.get_default_value()
304 scheme = TaskScheduler.scheme_name.get_default_value()
310 # Task Queue (in a Process)
305 # Task Queue (in a Process)
311 if scheme == 'pure':
306 if scheme == 'pure':
312 self.log.warn("task::using pure XREQ Task scheduler")
307 self.log.warn("task::using pure XREQ Task scheduler")
313 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
314 # q.setsockopt_out(zmq.HWM, hub.hwm)
309 # q.setsockopt_out(zmq.HWM, hub.hwm)
315 q.bind_in(hub.client_info['task'][1])
310 q.bind_in(hub.client_info['task'][1])
316 q.setsockopt_in(zmq.IDENTITY, 'task')
311 q.setsockopt_in(zmq.IDENTITY, 'task')
317 q.bind_out(hub.engine_info['task'])
312 q.bind_out(hub.engine_info['task'])
318 q.connect_mon(hub.monitor_url)
313 q.connect_mon(hub.monitor_url)
319 q.daemon=True
314 q.daemon=True
320 children.append(q)
315 children.append(q)
321 elif scheme == 'none':
316 elif scheme == 'none':
322 self.log.warn("task::using no Task scheduler")
317 self.log.warn("task::using no Task scheduler")
323
318
324 else:
319 else:
325 self.log.info("task::using Python %s Task scheduler"%scheme)
320 self.log.info("task::using Python %s Task scheduler"%scheme)
326 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
327 hub.monitor_url, hub.client_info['notification'])
322 hub.monitor_url, hub.client_info['notification'])
328 kwargs = dict(logname='scheduler', loglevel=self.log_level,
323 kwargs = dict(logname='scheduler', loglevel=self.log_level,
329 log_url = self.log_url, config=dict(self.config))
324 log_url = self.log_url, config=dict(self.config))
330 if 'Process' in self.mq_class:
325 if 'Process' in self.mq_class:
331 # run the Python scheduler in a Process
326 # run the Python scheduler in a Process
332 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
327 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
333 q.daemon=True
328 q.daemon=True
334 children.append(q)
329 children.append(q)
335 else:
330 else:
336 # single-threaded Controller
331 # single-threaded Controller
337 kwargs['in_thread'] = True
332 kwargs['in_thread'] = True
338 launch_scheduler(*sargs, **kwargs)
333 launch_scheduler(*sargs, **kwargs)
339
334
340
335
341 def save_urls(self):
336 def save_urls(self):
342 """save the registration urls to files."""
337 """save the registration urls to files."""
343 c = self.config
338 c = self.config
344
339
345 sec_dir = self.profile_dir.security_dir
340 sec_dir = self.profile_dir.security_dir
346 cf = self.factory
341 cf = self.factory
347
342
348 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
343 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
349 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
344 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
350
345
351 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
346 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
352 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
347 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
353
348
354
349
355 def do_import_statements(self):
350 def do_import_statements(self):
356 statements = self.import_statements
351 statements = self.import_statements
357 for s in statements:
352 for s in statements:
358 try:
353 try:
359 self.log.msg("Executing statement: '%s'" % s)
354 self.log.msg("Executing statement: '%s'" % s)
360 exec s in globals(), locals()
355 exec s in globals(), locals()
361 except:
356 except:
362 self.log.msg("Error running statement: %s" % s)
357 self.log.msg("Error running statement: %s" % s)
363
358
364 def forward_logging(self):
359 def forward_logging(self):
365 if self.log_url:
360 if self.log_url:
366 self.log.info("Forwarding logging to %s"%self.log_url)
361 self.log.info("Forwarding logging to %s"%self.log_url)
367 context = zmq.Context.instance()
362 context = zmq.Context.instance()
368 lsock = context.socket(zmq.PUB)
363 lsock = context.socket(zmq.PUB)
369 lsock.connect(self.log_url)
364 lsock.connect(self.log_url)
370 handler = PUBHandler(lsock)
365 handler = PUBHandler(lsock)
371 self.log.removeHandler(self._log_handler)
366 self.log.removeHandler(self._log_handler)
372 handler.root_topic = 'controller'
367 handler.root_topic = 'controller'
373 handler.setLevel(self.log_level)
368 handler.setLevel(self.log_level)
374 self.log.addHandler(handler)
369 self.log.addHandler(handler)
375 self._log_handler = handler
370 self._log_handler = handler
376 # #
371 # #
377
372
378 def initialize(self, argv=None):
373 def initialize(self, argv=None):
379 super(IPControllerApp, self).initialize(argv)
374 super(IPControllerApp, self).initialize(argv)
380 self.forward_logging()
375 self.forward_logging()
381 self.init_hub()
376 self.init_hub()
382 self.init_schedulers()
377 self.init_schedulers()
383
378
384 def start(self):
379 def start(self):
385 # Start the subprocesses:
380 # Start the subprocesses:
386 self.factory.start()
381 self.factory.start()
387 child_procs = []
382 child_procs = []
388 for child in self.children:
383 for child in self.children:
389 child.start()
384 child.start()
390 if isinstance(child, ProcessMonitoredQueue):
385 if isinstance(child, ProcessMonitoredQueue):
391 child_procs.append(child.launcher)
386 child_procs.append(child.launcher)
392 elif isinstance(child, Process):
387 elif isinstance(child, Process):
393 child_procs.append(child)
388 child_procs.append(child)
394 if child_procs:
389 if child_procs:
395 signal_children(child_procs)
390 signal_children(child_procs)
396
391
397 self.write_pid_file(overwrite=True)
392 self.write_pid_file(overwrite=True)
398
393
399 try:
394 try:
400 self.factory.loop.start()
395 self.factory.loop.start()
401 except KeyboardInterrupt:
396 except KeyboardInterrupt:
402 self.log.critical("Interrupted, Exiting...\n")
397 self.log.critical("Interrupted, Exiting...\n")
403
398
404
399
405
400
406 def launch_new_instance():
401 def launch_new_instance():
407 """Create and run the IPython controller"""
402 """Create and run the IPython controller"""
408 if sys.platform == 'win32':
403 if sys.platform == 'win32':
409 # make sure we don't get called from a multiprocessing subprocess
404 # make sure we don't get called from a multiprocessing subprocess
410 # this can result in infinite Controllers being started on Windows
405 # this can result in infinite Controllers being started on Windows
411 # which doesn't have a proper fork, so multiprocessing is wonky
406 # which doesn't have a proper fork, so multiprocessing is wonky
412
407
413 # this only comes up when IPython has been installed using vanilla
408 # this only comes up when IPython has been installed using vanilla
414 # setuptools, and *not* distribute.
409 # setuptools, and *not* distribute.
415 import multiprocessing
410 import multiprocessing
416 p = multiprocessing.current_process()
411 p = multiprocessing.current_process()
417 # the main process has name 'MainProcess'
412 # the main process has name 'MainProcess'
418 # subprocesses will have names like 'Process-1'
413 # subprocesses will have names like 'Process-1'
419 if p.name != 'MainProcess':
414 if p.name != 'MainProcess':
420 # we are a subprocess, don't start another Controller!
415 # we are a subprocess, don't start another Controller!
421 return
416 return
422 app = IPControllerApp.instance()
417 app = IPControllerApp.instance()
423 app.initialize()
418 app.initialize()
424 app.start()
419 app.start()
425
420
426
421
427 if __name__ == '__main__':
422 if __name__ == '__main__':
428 launch_new_instance()
423 launch_new_instance()
@@ -1,276 +1,276 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27
27
28 import zmq
28 import zmq
29 from zmq.eventloop import ioloop
29 from zmq.eventloop import ioloop
30
30
31 from IPython.core.profiledir import ProfileDir
31 from IPython.core.profiledir import ProfileDir
32 from IPython.parallel.apps.baseapp import BaseParallelApplication
32 from IPython.parallel.apps.baseapp import (
33 BaseParallelApplication,
34 base_aliases,
35 base_flags,
36 )
33 from IPython.zmq.log import EnginePUBHandler
37 from IPython.zmq.log import EnginePUBHandler
34
38
35 from IPython.config.configurable import Configurable
39 from IPython.config.configurable import Configurable
36 from IPython.zmq.session import Session
40 from IPython.zmq.session import Session
37 from IPython.parallel.engine.engine import EngineFactory
41 from IPython.parallel.engine.engine import EngineFactory
38 from IPython.parallel.engine.streamkernel import Kernel
42 from IPython.parallel.engine.streamkernel import Kernel
39 from IPython.parallel.util import disambiguate_url
43 from IPython.parallel.util import disambiguate_url
40
44
41 from IPython.utils.importstring import import_item
45 from IPython.utils.importstring import import_item
42 from IPython.utils.traitlets import Bool, Unicode, Dict, List
46 from IPython.utils.traitlets import Bool, Unicode, Dict, List
43
47
44
48
45 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
46 # Module level variables
50 # Module level variables
47 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
48
52
49 #: The default config file name for this application
53 #: The default config file name for this application
50 default_config_file_name = u'ipengine_config.py'
54 default_config_file_name = u'ipengine_config.py'
51
55
52 _description = """Start an IPython engine for parallel computing.
56 _description = """Start an IPython engine for parallel computing.
53
57
54 IPython engines run in parallel and perform computations on behalf of a client
58 IPython engines run in parallel and perform computations on behalf of a client
55 and controller. A controller needs to be started before the engines. The
59 and controller. A controller needs to be started before the engines. The
56 engine can be configured using command line options or using a cluster
60 engine can be configured using command line options or using a cluster
57 directory. Cluster directories contain config, log and security files and are
61 directory. Cluster directories contain config, log and security files and are
58 usually located in your ipython directory and named as "profile_name".
62 usually located in your ipython directory and named as "profile_name".
59 See the `profile` and `profile_dir` options for details.
63 See the `profile` and `profile_dir` options for details.
60 """
64 """
61
65
62
66
63 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
64 # MPI configuration
68 # MPI configuration
65 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
66
70
67 mpi4py_init = """from mpi4py import MPI as mpi
71 mpi4py_init = """from mpi4py import MPI as mpi
68 mpi.size = mpi.COMM_WORLD.Get_size()
72 mpi.size = mpi.COMM_WORLD.Get_size()
69 mpi.rank = mpi.COMM_WORLD.Get_rank()
73 mpi.rank = mpi.COMM_WORLD.Get_rank()
70 """
74 """
71
75
72
76
73 pytrilinos_init = """from PyTrilinos import Epetra
77 pytrilinos_init = """from PyTrilinos import Epetra
74 class SimpleStruct:
78 class SimpleStruct:
75 pass
79 pass
76 mpi = SimpleStruct()
80 mpi = SimpleStruct()
77 mpi.rank = 0
81 mpi.rank = 0
78 mpi.size = 0
82 mpi.size = 0
79 """
83 """
80
84
81 class MPI(Configurable):
85 class MPI(Configurable):
82 """Configurable for MPI initialization"""
86 """Configurable for MPI initialization"""
83 use = Unicode('', config=True,
87 use = Unicode('', config=True,
84 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
88 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
85 )
89 )
86
90
87 def _on_use_changed(self, old, new):
91 def _on_use_changed(self, old, new):
88 # load default init script if it's not set
92 # load default init script if it's not set
89 if not self.init_script:
93 if not self.init_script:
90 self.init_script = self.default_inits.get(new, '')
94 self.init_script = self.default_inits.get(new, '')
91
95
92 init_script = Unicode('', config=True,
96 init_script = Unicode('', config=True,
93 help="Initialization code for MPI")
97 help="Initialization code for MPI")
94
98
95 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
99 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
96 config=True)
100 config=True)
97
101
98
102
99 #-----------------------------------------------------------------------------
103 #-----------------------------------------------------------------------------
100 # Main application
104 # Main application
101 #-----------------------------------------------------------------------------
105 #-----------------------------------------------------------------------------
106 aliases = dict(
107 file = 'IPEngineApp.url_file',
108 c = 'IPEngineApp.startup_command',
109 s = 'IPEngineApp.startup_script',
102
110
111 ident = 'Session.session',
112 user = 'Session.username',
113 exec_key = 'Session.keyfile',
114
115 url = 'EngineFactory.url',
116 ip = 'EngineFactory.ip',
117 transport = 'EngineFactory.transport',
118 port = 'EngineFactory.regport',
119 location = 'EngineFactory.location',
120
121 timeout = 'EngineFactory.timeout',
122
123 mpi = 'MPI.use',
124
125 )
126 aliases.update(base_aliases)
103
127
104 class IPEngineApp(BaseParallelApplication):
128 class IPEngineApp(BaseParallelApplication):
105
129
106 name = Unicode(u'ipengine')
130 name = Unicode(u'ipengine')
107 description = Unicode(_description)
131 description = Unicode(_description)
108 config_file_name = Unicode(default_config_file_name)
132 config_file_name = Unicode(default_config_file_name)
109 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
133 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
110
134
111 startup_script = Unicode(u'', config=True,
135 startup_script = Unicode(u'', config=True,
112 help='specify a script to be run at startup')
136 help='specify a script to be run at startup')
113 startup_command = Unicode('', config=True,
137 startup_command = Unicode('', config=True,
114 help='specify a command to be run at startup')
138 help='specify a command to be run at startup')
115
139
116 url_file = Unicode(u'', config=True,
140 url_file = Unicode(u'', config=True,
117 help="""The full location of the file containing the connection information for
141 help="""The full location of the file containing the connection information for
118 the controller. If this is not given, the file must be in the
142 the controller. If this is not given, the file must be in the
119 security directory of the cluster directory. This location is
143 security directory of the cluster directory. This location is
120 resolved using the `profile` or `profile_dir` options.""",
144 resolved using the `profile` or `profile_dir` options.""",
121 )
145 )
122
146
123 url_file_name = Unicode(u'ipcontroller-engine.json')
147 url_file_name = Unicode(u'ipcontroller-engine.json')
124 log_url = Unicode('', config=True,
148 log_url = Unicode('', config=True,
125 help="""The URL for the iploggerapp instance, for forwarding
149 help="""The URL for the iploggerapp instance, for forwarding
126 logging to a central location.""")
150 logging to a central location.""")
127
151
128 aliases = Dict(dict(
152 aliases = Dict(aliases)
129 file = 'IPEngineApp.url_file',
130 c = 'IPEngineApp.startup_command',
131 s = 'IPEngineApp.startup_script',
132
133 ident = 'Session.session',
134 user = 'Session.username',
135 exec_key = 'Session.keyfile',
136
137 url = 'EngineFactory.url',
138 ip = 'EngineFactory.ip',
139 transport = 'EngineFactory.transport',
140 port = 'EngineFactory.regport',
141 location = 'EngineFactory.location',
142
143 timeout = 'EngineFactory.timeout',
144
145 profile = "IPEngineApp.profile",
146 profile_dir = 'ProfileDir.location',
147
148 mpi = 'MPI.use',
149
150 log_level = 'IPEngineApp.log_level',
151 log_url = 'IPEngineApp.log_url'
152 ))
153
153
154 # def find_key_file(self):
154 # def find_key_file(self):
155 # """Set the key file.
155 # """Set the key file.
156 #
156 #
157 # Here we don't try to actually see if it exists for is valid as that
157 # Here we don't try to actually see if it exists for is valid as that
158 # is hadled by the connection logic.
158 # is hadled by the connection logic.
159 # """
159 # """
160 # config = self.master_config
160 # config = self.master_config
161 # # Find the actual controller key file
161 # # Find the actual controller key file
162 # if not config.Global.key_file:
162 # if not config.Global.key_file:
163 # try_this = os.path.join(
163 # try_this = os.path.join(
164 # config.Global.profile_dir,
164 # config.Global.profile_dir,
165 # config.Global.security_dir,
165 # config.Global.security_dir,
166 # config.Global.key_file_name
166 # config.Global.key_file_name
167 # )
167 # )
168 # config.Global.key_file = try_this
168 # config.Global.key_file = try_this
169
169
170 def find_url_file(self):
170 def find_url_file(self):
171 """Set the key file.
171 """Set the key file.
172
172
173 Here we don't try to actually see if it exists for is valid as that
173 Here we don't try to actually see if it exists for is valid as that
174 is hadled by the connection logic.
174 is hadled by the connection logic.
175 """
175 """
176 config = self.config
176 config = self.config
177 # Find the actual controller key file
177 # Find the actual controller key file
178 if not self.url_file:
178 if not self.url_file:
179 self.url_file = os.path.join(
179 self.url_file = os.path.join(
180 self.profile_dir.security_dir,
180 self.profile_dir.security_dir,
181 self.url_file_name
181 self.url_file_name
182 )
182 )
183 def init_engine(self):
183 def init_engine(self):
184 # This is the working dir by now.
184 # This is the working dir by now.
185 sys.path.insert(0, '')
185 sys.path.insert(0, '')
186 config = self.config
186 config = self.config
187 # print config
187 # print config
188 self.find_url_file()
188 self.find_url_file()
189
189
190 # if os.path.exists(config.Global.key_file) and config.Global.secure:
190 # if os.path.exists(config.Global.key_file) and config.Global.secure:
191 # config.SessionFactory.exec_key = config.Global.key_file
191 # config.SessionFactory.exec_key = config.Global.key_file
192 if os.path.exists(self.url_file):
192 if os.path.exists(self.url_file):
193 with open(self.url_file) as f:
193 with open(self.url_file) as f:
194 d = json.loads(f.read())
194 d = json.loads(f.read())
195 for k,v in d.iteritems():
195 for k,v in d.iteritems():
196 if isinstance(v, unicode):
196 if isinstance(v, unicode):
197 d[k] = v.encode()
197 d[k] = v.encode()
198 if d['exec_key']:
198 if d['exec_key']:
199 config.Session.key = d['exec_key']
199 config.Session.key = d['exec_key']
200 d['url'] = disambiguate_url(d['url'], d['location'])
200 d['url'] = disambiguate_url(d['url'], d['location'])
201 config.EngineFactory.url = d['url']
201 config.EngineFactory.url = d['url']
202 config.EngineFactory.location = d['location']
202 config.EngineFactory.location = d['location']
203
203
204 try:
204 try:
205 exec_lines = config.Kernel.exec_lines
205 exec_lines = config.Kernel.exec_lines
206 except AttributeError:
206 except AttributeError:
207 config.Kernel.exec_lines = []
207 config.Kernel.exec_lines = []
208 exec_lines = config.Kernel.exec_lines
208 exec_lines = config.Kernel.exec_lines
209
209
210 if self.startup_script:
210 if self.startup_script:
211 enc = sys.getfilesystemencoding() or 'utf8'
211 enc = sys.getfilesystemencoding() or 'utf8'
212 cmd="execfile(%r)"%self.startup_script.encode(enc)
212 cmd="execfile(%r)"%self.startup_script.encode(enc)
213 exec_lines.append(cmd)
213 exec_lines.append(cmd)
214 if self.startup_command:
214 if self.startup_command:
215 exec_lines.append(self.startup_command)
215 exec_lines.append(self.startup_command)
216
216
217 # Create the underlying shell class and Engine
217 # Create the underlying shell class and Engine
218 # shell_class = import_item(self.master_config.Global.shell_class)
218 # shell_class = import_item(self.master_config.Global.shell_class)
219 # print self.config
219 # print self.config
220 try:
220 try:
221 self.engine = EngineFactory(config=config, log=self.log)
221 self.engine = EngineFactory(config=config, log=self.log)
222 except:
222 except:
223 self.log.error("Couldn't start the Engine", exc_info=True)
223 self.log.error("Couldn't start the Engine", exc_info=True)
224 self.exit(1)
224 self.exit(1)
225
225
226 def forward_logging(self):
226 def forward_logging(self):
227 if self.log_url:
227 if self.log_url:
228 self.log.info("Forwarding logging to %s"%self.log_url)
228 self.log.info("Forwarding logging to %s"%self.log_url)
229 context = self.engine.context
229 context = self.engine.context
230 lsock = context.socket(zmq.PUB)
230 lsock = context.socket(zmq.PUB)
231 lsock.connect(self.log_url)
231 lsock.connect(self.log_url)
232 self.log.removeHandler(self._log_handler)
232 self.log.removeHandler(self._log_handler)
233 handler = EnginePUBHandler(self.engine, lsock)
233 handler = EnginePUBHandler(self.engine, lsock)
234 handler.setLevel(self.log_level)
234 handler.setLevel(self.log_level)
235 self.log.addHandler(handler)
235 self.log.addHandler(handler)
236 self._log_handler = handler
236 self._log_handler = handler
237 #
237 #
238 def init_mpi(self):
238 def init_mpi(self):
239 global mpi
239 global mpi
240 self.mpi = MPI(config=self.config)
240 self.mpi = MPI(config=self.config)
241
241
242 mpi_import_statement = self.mpi.init_script
242 mpi_import_statement = self.mpi.init_script
243 if mpi_import_statement:
243 if mpi_import_statement:
244 try:
244 try:
245 self.log.info("Initializing MPI:")
245 self.log.info("Initializing MPI:")
246 self.log.info(mpi_import_statement)
246 self.log.info(mpi_import_statement)
247 exec mpi_import_statement in globals()
247 exec mpi_import_statement in globals()
248 except:
248 except:
249 mpi = None
249 mpi = None
250 else:
250 else:
251 mpi = None
251 mpi = None
252
252
253 def initialize(self, argv=None):
253 def initialize(self, argv=None):
254 super(IPEngineApp, self).initialize(argv)
254 super(IPEngineApp, self).initialize(argv)
255 self.init_mpi()
255 self.init_mpi()
256 self.init_engine()
256 self.init_engine()
257 self.forward_logging()
257 self.forward_logging()
258
258
259 def start(self):
259 def start(self):
260 self.engine.start()
260 self.engine.start()
261 try:
261 try:
262 self.engine.loop.start()
262 self.engine.loop.start()
263 except KeyboardInterrupt:
263 except KeyboardInterrupt:
264 self.log.critical("Engine Interrupted, shutting down...\n")
264 self.log.critical("Engine Interrupted, shutting down...\n")
265
265
266
266
267 def launch_new_instance():
267 def launch_new_instance():
268 """Create and run the IPython engine"""
268 """Create and run the IPython engine"""
269 app = IPEngineApp.instance()
269 app = IPEngineApp.instance()
270 app.initialize()
270 app.initialize()
271 app.start()
271 app.start()
272
272
273
273
274 if __name__ == '__main__':
274 if __name__ == '__main__':
275 launch_new_instance()
275 launch_new_instance()
276
276
General Comments 0
You need to be logged in to leave comments. Login now