##// END OF EJS Templates
parallel.apps cleanup per review...
MinRK -
Show More
@@ -1,257 +1,261 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 The Base Application class for IPython.parallel apps
3 The Base Application class for IPython.parallel apps
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * Min RK
8 * Min RK
9
9
10 """
10 """
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 from __future__ import with_statement
23 from __future__ import with_statement
24
24
25 import os
25 import os
26 import logging
26 import logging
27 import re
27 import re
28 import sys
28 import sys
29
29
30 from subprocess import Popen, PIPE
30 from subprocess import Popen, PIPE
31
31
32 from IPython.core import release
32 from IPython.core import release
33 from IPython.core.crashhandler import CrashHandler
33 from IPython.core.crashhandler import CrashHandler
34 from IPython.core.application import (
34 from IPython.core.application import (
35 BaseIPythonApplication,
35 BaseIPythonApplication,
36 base_aliases as base_ip_aliases,
36 base_aliases as base_ip_aliases,
37 base_flags as base_ip_flags
37 base_flags as base_ip_flags
38 )
38 )
39 from IPython.utils.path import expand_path
39 from IPython.utils.path import expand_path
40
40
41 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
41 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Module errors
44 # Module errors
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 class PIDFileError(Exception):
47 class PIDFileError(Exception):
48 pass
48 pass
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Crash handler for this application
52 # Crash handler for this application
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55 class ParallelCrashHandler(CrashHandler):
55 class ParallelCrashHandler(CrashHandler):
56 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
56 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
57
57
58 def __init__(self, app):
58 def __init__(self, app):
59 contact_name = release.authors['Min'][0]
59 contact_name = release.authors['Min'][0]
60 contact_email = release.authors['Min'][1]
60 contact_email = release.authors['Min'][1]
61 bug_tracker = 'http://github.com/ipython/ipython/issues'
61 bug_tracker = 'http://github.com/ipython/ipython/issues'
62 super(ParallelCrashHandler,self).__init__(
62 super(ParallelCrashHandler,self).__init__(
63 app, contact_name, contact_email, bug_tracker
63 app, contact_name, contact_email, bug_tracker
64 )
64 )
65
65
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # Main application
68 # Main application
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70 base_aliases = {}
70 base_aliases = {}
71 base_aliases.update(base_ip_aliases)
71 base_aliases.update(base_ip_aliases)
72 base_aliases.update({
72 base_aliases.update({
73 'profile-dir' : 'ProfileDir.location',
73 'profile-dir' : 'ProfileDir.location',
74 'work-dir' : 'BaseParallelApplication.work_dir',
74 'work-dir' : 'BaseParallelApplication.work_dir',
75 'log-to-file' : 'BaseParallelApplication.log_to_file',
75 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 'clean-logs' : 'BaseParallelApplication.clean_logs',
76 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 'log-url' : 'BaseParallelApplication.log_url',
77 'log-url' : 'BaseParallelApplication.log_url',
78 'cluster-id' : 'BaseParallelApplication.cluster_id',
78 'cluster-id' : 'BaseParallelApplication.cluster_id',
79 })
79 })
80
80
81 base_flags = {
81 base_flags = {
82 'log-to-file' : (
82 'log-to-file' : (
83 {'BaseParallelApplication' : {'log_to_file' : True}},
83 {'BaseParallelApplication' : {'log_to_file' : True}},
84 "send log output to a file"
84 "send log output to a file"
85 )
85 )
86 }
86 }
87 base_flags.update(base_ip_flags)
87 base_flags.update(base_ip_flags)
88
88
89 class BaseParallelApplication(BaseIPythonApplication):
89 class BaseParallelApplication(BaseIPythonApplication):
90 """The base Application for IPython.parallel apps
90 """The base Application for IPython.parallel apps
91
91
92 Principle extensions to BaseIPyythonApplication:
92 Principle extensions to BaseIPyythonApplication:
93
93
94 * work_dir
94 * work_dir
95 * remote logging via pyzmq
95 * remote logging via pyzmq
96 * IOLoop instance
96 * IOLoop instance
97 """
97 """
98
98
99 crash_handler_class = ParallelCrashHandler
99 crash_handler_class = ParallelCrashHandler
100
100
101 def _log_level_default(self):
101 def _log_level_default(self):
102 # temporarily override default_log_level to INFO
102 # temporarily override default_log_level to INFO
103 return logging.INFO
103 return logging.INFO
104
104
105 work_dir = Unicode(os.getcwdu(), config=True,
105 work_dir = Unicode(os.getcwdu(), config=True,
106 help='Set the working dir for the process.'
106 help='Set the working dir for the process.'
107 )
107 )
108 def _work_dir_changed(self, name, old, new):
108 def _work_dir_changed(self, name, old, new):
109 self.work_dir = unicode(expand_path(new))
109 self.work_dir = unicode(expand_path(new))
110
110
111 log_to_file = Bool(config=True,
111 log_to_file = Bool(config=True,
112 help="whether to log to a file")
112 help="whether to log to a file")
113
113
114 clean_logs = Bool(False, config=True,
114 clean_logs = Bool(False, config=True,
115 help="whether to cleanup old logfiles before starting")
115 help="whether to cleanup old logfiles before starting")
116
116
117 log_url = Unicode('', config=True,
117 log_url = Unicode('', config=True,
118 help="The ZMQ URL of the iplogger to aggregate logging.")
118 help="The ZMQ URL of the iplogger to aggregate logging.")
119
119
120 cluster_id = Unicode('', config=True,
120 cluster_id = Unicode('', config=True,
121 help="""String id to add to runtime files, to prevent name collisions when
121 help="""String id to add to runtime files, to prevent name collisions when
122 using multiple clusters with a single profile.
122 using multiple clusters with a single profile simultaneously.
123
123
124 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
124 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
125
126 Since this is text inserted into filenames, typical recommendations apply:
127 Simple character strings are ideal, and spaces are not recommended (but should
128 generally work).
125 """
129 """
126 )
130 )
127 def _cluster_id_changed(self, name, old, new):
131 def _cluster_id_changed(self, name, old, new):
128 self.name = self.__class__.name
132 self.name = self.__class__.name
129 if new:
133 if new:
130 self.name += '-%s'%new
134 self.name += '-%s'%new
131
135
132 def _config_files_default(self):
136 def _config_files_default(self):
133 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
137 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
134
138
135 loop = Instance('zmq.eventloop.ioloop.IOLoop')
139 loop = Instance('zmq.eventloop.ioloop.IOLoop')
136 def _loop_default(self):
140 def _loop_default(self):
137 from zmq.eventloop.ioloop import IOLoop
141 from zmq.eventloop.ioloop import IOLoop
138 return IOLoop.instance()
142 return IOLoop.instance()
139
143
140 aliases = Dict(base_aliases)
144 aliases = Dict(base_aliases)
141 flags = Dict(base_flags)
145 flags = Dict(base_flags)
142
146
143 def initialize(self, argv=None):
147 def initialize(self, argv=None):
144 """initialize the app"""
148 """initialize the app"""
145 super(BaseParallelApplication, self).initialize(argv)
149 super(BaseParallelApplication, self).initialize(argv)
146 self.to_work_dir()
150 self.to_work_dir()
147 self.reinit_logging()
151 self.reinit_logging()
148
152
149 def to_work_dir(self):
153 def to_work_dir(self):
150 wd = self.work_dir
154 wd = self.work_dir
151 if unicode(wd) != os.getcwdu():
155 if unicode(wd) != os.getcwdu():
152 os.chdir(wd)
156 os.chdir(wd)
153 self.log.info("Changing to working dir: %s" % wd)
157 self.log.info("Changing to working dir: %s" % wd)
154 # This is the working dir by now.
158 # This is the working dir by now.
155 sys.path.insert(0, '')
159 sys.path.insert(0, '')
156
160
157 def reinit_logging(self):
161 def reinit_logging(self):
158 # Remove old log files
162 # Remove old log files
159 log_dir = self.profile_dir.log_dir
163 log_dir = self.profile_dir.log_dir
160 if self.clean_logs:
164 if self.clean_logs:
161 for f in os.listdir(log_dir):
165 for f in os.listdir(log_dir):
162 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
166 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
163 os.remove(os.path.join(log_dir, f))
167 os.remove(os.path.join(log_dir, f))
164 if self.log_to_file:
168 if self.log_to_file:
165 # Start logging to the new log file
169 # Start logging to the new log file
166 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
170 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
167 logfile = os.path.join(log_dir, log_filename)
171 logfile = os.path.join(log_dir, log_filename)
168 open_log_file = open(logfile, 'w')
172 open_log_file = open(logfile, 'w')
169 else:
173 else:
170 open_log_file = None
174 open_log_file = None
171 if open_log_file is not None:
175 if open_log_file is not None:
172 self.log.removeHandler(self._log_handler)
176 self.log.removeHandler(self._log_handler)
173 self._log_handler = logging.StreamHandler(open_log_file)
177 self._log_handler = logging.StreamHandler(open_log_file)
174 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
178 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
175 self._log_handler.setFormatter(self._log_formatter)
179 self._log_handler.setFormatter(self._log_formatter)
176 self.log.addHandler(self._log_handler)
180 self.log.addHandler(self._log_handler)
177 # do not propagate log messages to root logger
181 # do not propagate log messages to root logger
178 # ipcluster app will sometimes print duplicate messages during shutdown
182 # ipcluster app will sometimes print duplicate messages during shutdown
179 # if this is 1 (default):
183 # if this is 1 (default):
180 self.log.propagate = False
184 self.log.propagate = False
181
185
182 def write_pid_file(self, overwrite=False):
186 def write_pid_file(self, overwrite=False):
183 """Create a .pid file in the pid_dir with my pid.
187 """Create a .pid file in the pid_dir with my pid.
184
188
185 This must be called after pre_construct, which sets `self.pid_dir`.
189 This must be called after pre_construct, which sets `self.pid_dir`.
186 This raises :exc:`PIDFileError` if the pid file exists already.
190 This raises :exc:`PIDFileError` if the pid file exists already.
187 """
191 """
188 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
192 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
189 if os.path.isfile(pid_file):
193 if os.path.isfile(pid_file):
190 pid = self.get_pid_from_file()
194 pid = self.get_pid_from_file()
191 if not overwrite:
195 if not overwrite:
192 raise PIDFileError(
196 raise PIDFileError(
193 'The pid file [%s] already exists. \nThis could mean that this '
197 'The pid file [%s] already exists. \nThis could mean that this '
194 'server is already running with [pid=%s].' % (pid_file, pid)
198 'server is already running with [pid=%s].' % (pid_file, pid)
195 )
199 )
196 with open(pid_file, 'w') as f:
200 with open(pid_file, 'w') as f:
197 self.log.info("Creating pid file: %s" % pid_file)
201 self.log.info("Creating pid file: %s" % pid_file)
198 f.write(repr(os.getpid())+'\n')
202 f.write(repr(os.getpid())+'\n')
199
203
200 def remove_pid_file(self):
204 def remove_pid_file(self):
201 """Remove the pid file.
205 """Remove the pid file.
202
206
203 This should be called at shutdown by registering a callback with
207 This should be called at shutdown by registering a callback with
204 :func:`reactor.addSystemEventTrigger`. This needs to return
208 :func:`reactor.addSystemEventTrigger`. This needs to return
205 ``None``.
209 ``None``.
206 """
210 """
207 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
211 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
208 if os.path.isfile(pid_file):
212 if os.path.isfile(pid_file):
209 try:
213 try:
210 self.log.info("Removing pid file: %s" % pid_file)
214 self.log.info("Removing pid file: %s" % pid_file)
211 os.remove(pid_file)
215 os.remove(pid_file)
212 except:
216 except:
213 self.log.warn("Error removing the pid file: %s" % pid_file)
217 self.log.warn("Error removing the pid file: %s" % pid_file)
214
218
215 def get_pid_from_file(self):
219 def get_pid_from_file(self):
216 """Get the pid from the pid file.
220 """Get the pid from the pid file.
217
221
218 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
222 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
219 """
223 """
220 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
224 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
221 if os.path.isfile(pid_file):
225 if os.path.isfile(pid_file):
222 with open(pid_file, 'r') as f:
226 with open(pid_file, 'r') as f:
223 s = f.read().strip()
227 s = f.read().strip()
224 try:
228 try:
225 pid = int(s)
229 pid = int(s)
226 except:
230 except:
227 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
231 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
228 return pid
232 return pid
229 else:
233 else:
230 raise PIDFileError('pid file not found: %s' % pid_file)
234 raise PIDFileError('pid file not found: %s' % pid_file)
231
235
232 def check_pid(self, pid):
236 def check_pid(self, pid):
233 if os.name == 'nt':
237 if os.name == 'nt':
234 try:
238 try:
235 import ctypes
239 import ctypes
236 # returns 0 if no such process (of ours) exists
240 # returns 0 if no such process (of ours) exists
237 # positive int otherwise
241 # positive int otherwise
238 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
242 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
239 except Exception:
243 except Exception:
240 self.log.warn(
244 self.log.warn(
241 "Could not determine whether pid %i is running via `OpenProcess`. "
245 "Could not determine whether pid %i is running via `OpenProcess`. "
242 " Making the likely assumption that it is."%pid
246 " Making the likely assumption that it is."%pid
243 )
247 )
244 return True
248 return True
245 return bool(p)
249 return bool(p)
246 else:
250 else:
247 try:
251 try:
248 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
252 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
249 output,_ = p.communicate()
253 output,_ = p.communicate()
250 except OSError:
254 except OSError:
251 self.log.warn(
255 self.log.warn(
252 "Could not determine whether pid %i is running via `ps x`. "
256 "Could not determine whether pid %i is running via `ps x`. "
253 " Making the likely assumption that it is."%pid
257 " Making the likely assumption that it is."%pid
254 )
258 )
255 return True
259 return True
256 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
260 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
257 return pid in pids
261 return pid in pids
1 NO CONTENT: modified file
NO CONTENT: modified file
@@ -1,344 +1,344 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27 import time
27 import time
28
28
29 import zmq
29 import zmq
30 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
31
31
32 from IPython.core.profiledir import ProfileDir
32 from IPython.core.profiledir import ProfileDir
33 from IPython.parallel.apps.baseapp import (
33 from IPython.parallel.apps.baseapp import (
34 BaseParallelApplication,
34 BaseParallelApplication,
35 base_aliases,
35 base_aliases,
36 base_flags,
36 base_flags,
37 )
37 )
38 from IPython.zmq.log import EnginePUBHandler
38 from IPython.zmq.log import EnginePUBHandler
39
39
40 from IPython.config.configurable import Configurable
40 from IPython.config.configurable import Configurable
41 from IPython.zmq.session import Session
41 from IPython.zmq.session import Session
42 from IPython.parallel.engine.engine import EngineFactory
42 from IPython.parallel.engine.engine import EngineFactory
43 from IPython.parallel.engine.streamkernel import Kernel
43 from IPython.parallel.engine.streamkernel import Kernel
44 from IPython.parallel.util import disambiguate_url, asbytes
44 from IPython.parallel.util import disambiguate_url, asbytes
45
45
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
48
48
49
49
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51 # Module level variables
51 # Module level variables
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53
53
54 #: The default config file name for this application
54 #: The default config file name for this application
55 default_config_file_name = u'ipengine_config.py'
55 default_config_file_name = u'ipengine_config.py'
56
56
57 _description = """Start an IPython engine for parallel computing.
57 _description = """Start an IPython engine for parallel computing.
58
58
59 IPython engines run in parallel and perform computations on behalf of a client
59 IPython engines run in parallel and perform computations on behalf of a client
60 and controller. A controller needs to be started before the engines. The
60 and controller. A controller needs to be started before the engines. The
61 engine can be configured using command line options or using a cluster
61 engine can be configured using command line options or using a cluster
62 directory. Cluster directories contain config, log and security files and are
62 directory. Cluster directories contain config, log and security files and are
63 usually located in your ipython directory and named as "profile_name".
63 usually located in your ipython directory and named as "profile_name".
64 See the `profile` and `profile-dir` options for details.
64 See the `profile` and `profile-dir` options for details.
65 """
65 """
66
66
67 _examples = """
67 _examples = """
68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
70 """
70 """
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # MPI configuration
73 # MPI configuration
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76 mpi4py_init = """from mpi4py import MPI as mpi
76 mpi4py_init = """from mpi4py import MPI as mpi
77 mpi.size = mpi.COMM_WORLD.Get_size()
77 mpi.size = mpi.COMM_WORLD.Get_size()
78 mpi.rank = mpi.COMM_WORLD.Get_rank()
78 mpi.rank = mpi.COMM_WORLD.Get_rank()
79 """
79 """
80
80
81
81
82 pytrilinos_init = """from PyTrilinos import Epetra
82 pytrilinos_init = """from PyTrilinos import Epetra
83 class SimpleStruct:
83 class SimpleStruct:
84 pass
84 pass
85 mpi = SimpleStruct()
85 mpi = SimpleStruct()
86 mpi.rank = 0
86 mpi.rank = 0
87 mpi.size = 0
87 mpi.size = 0
88 """
88 """
89
89
90 class MPI(Configurable):
90 class MPI(Configurable):
91 """Configurable for MPI initialization"""
91 """Configurable for MPI initialization"""
92 use = Unicode('', config=True,
92 use = Unicode('', config=True,
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
94 )
94 )
95
95
96 def _on_use_changed(self, old, new):
96 def _use_changed(self, name, old, new):
97 # load default init script if it's not set
97 # load default init script if it's not set
98 if not self.init_script:
98 if not self.init_script:
99 self.init_script = self.default_inits.get(new, '')
99 self.init_script = self.default_inits.get(new, '')
100
100
101 init_script = Unicode('', config=True,
101 init_script = Unicode('', config=True,
102 help="Initialization code for MPI")
102 help="Initialization code for MPI")
103
103
104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
105 config=True)
105 config=True)
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Main application
109 # Main application
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111 aliases = dict(
111 aliases = dict(
112 file = 'IPEngineApp.url_file',
112 file = 'IPEngineApp.url_file',
113 c = 'IPEngineApp.startup_command',
113 c = 'IPEngineApp.startup_command',
114 s = 'IPEngineApp.startup_script',
114 s = 'IPEngineApp.startup_script',
115
115
116 ident = 'Session.session',
116 ident = 'Session.session',
117 user = 'Session.username',
117 user = 'Session.username',
118 keyfile = 'Session.keyfile',
118 keyfile = 'Session.keyfile',
119
119
120 url = 'EngineFactory.url',
120 url = 'EngineFactory.url',
121 ssh = 'EngineFactory.sshserver',
121 ssh = 'EngineFactory.sshserver',
122 sshkey = 'EngineFactory.sshkey',
122 sshkey = 'EngineFactory.sshkey',
123 ip = 'EngineFactory.ip',
123 ip = 'EngineFactory.ip',
124 transport = 'EngineFactory.transport',
124 transport = 'EngineFactory.transport',
125 port = 'EngineFactory.regport',
125 port = 'EngineFactory.regport',
126 location = 'EngineFactory.location',
126 location = 'EngineFactory.location',
127
127
128 timeout = 'EngineFactory.timeout',
128 timeout = 'EngineFactory.timeout',
129
129
130 mpi = 'MPI.use',
130 mpi = 'MPI.use',
131
131
132 )
132 )
133 aliases.update(base_aliases)
133 aliases.update(base_aliases)
134
134
135
135
136 class IPEngineApp(BaseParallelApplication):
136 class IPEngineApp(BaseParallelApplication):
137
137
138 name = 'ipengine'
138 name = 'ipengine'
139 description = _description
139 description = _description
140 examples = _examples
140 examples = _examples
141 config_file_name = Unicode(default_config_file_name)
141 config_file_name = Unicode(default_config_file_name)
142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
143
143
144 startup_script = Unicode(u'', config=True,
144 startup_script = Unicode(u'', config=True,
145 help='specify a script to be run at startup')
145 help='specify a script to be run at startup')
146 startup_command = Unicode('', config=True,
146 startup_command = Unicode('', config=True,
147 help='specify a command to be run at startup')
147 help='specify a command to be run at startup')
148
148
149 url_file = Unicode(u'', config=True,
149 url_file = Unicode(u'', config=True,
150 help="""The full location of the file containing the connection information for
150 help="""The full location of the file containing the connection information for
151 the controller. If this is not given, the file must be in the
151 the controller. If this is not given, the file must be in the
152 security directory of the cluster directory. This location is
152 security directory of the cluster directory. This location is
153 resolved using the `profile` or `profile_dir` options.""",
153 resolved using the `profile` or `profile_dir` options.""",
154 )
154 )
155 wait_for_url_file = Float(5, config=True,
155 wait_for_url_file = Float(5, config=True,
156 help="""The maximum number of seconds to wait for url_file to exist.
156 help="""The maximum number of seconds to wait for url_file to exist.
157 This is useful for batch-systems and shared-filesystems where the
157 This is useful for batch-systems and shared-filesystems where the
158 controller and engine are started at the same time and it
158 controller and engine are started at the same time and it
159 may take a moment for the controller to write the connector files.""")
159 may take a moment for the controller to write the connector files.""")
160
160
161 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
161 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
162
162
163 def _cluster_id_changed(self, name, old, new):
163 def _cluster_id_changed(self, name, old, new):
164 if new:
164 if new:
165 base = 'ipcontroller-%s'%new
165 base = 'ipcontroller-%s' % new
166 else:
166 else:
167 base = 'ipcontroller'
167 base = 'ipcontroller'
168 self.url_file_name = "%s-engine.json"%base
168 self.url_file_name = "%s-engine.json" % base
169
169
170 log_url = Unicode('', config=True,
170 log_url = Unicode('', config=True,
171 help="""The URL for the iploggerapp instance, for forwarding
171 help="""The URL for the iploggerapp instance, for forwarding
172 logging to a central location.""")
172 logging to a central location.""")
173
173
174 aliases = Dict(aliases)
174 aliases = Dict(aliases)
175
175
176 # def find_key_file(self):
176 # def find_key_file(self):
177 # """Set the key file.
177 # """Set the key file.
178 #
178 #
179 # Here we don't try to actually see if it exists for is valid as that
179 # Here we don't try to actually see if it exists for is valid as that
180 # is hadled by the connection logic.
180 # is hadled by the connection logic.
181 # """
181 # """
182 # config = self.master_config
182 # config = self.master_config
183 # # Find the actual controller key file
183 # # Find the actual controller key file
184 # if not config.Global.key_file:
184 # if not config.Global.key_file:
185 # try_this = os.path.join(
185 # try_this = os.path.join(
186 # config.Global.profile_dir,
186 # config.Global.profile_dir,
187 # config.Global.security_dir,
187 # config.Global.security_dir,
188 # config.Global.key_file_name
188 # config.Global.key_file_name
189 # )
189 # )
190 # config.Global.key_file = try_this
190 # config.Global.key_file = try_this
191
191
192 def find_url_file(self):
192 def find_url_file(self):
193 """Set the url file.
193 """Set the url file.
194
194
195 Here we don't try to actually see if it exists for is valid as that
195 Here we don't try to actually see if it exists for is valid as that
196 is hadled by the connection logic.
196 is hadled by the connection logic.
197 """
197 """
198 config = self.config
198 config = self.config
199 # Find the actual controller key file
199 # Find the actual controller key file
200 if not self.url_file:
200 if not self.url_file:
201 self.url_file = os.path.join(
201 self.url_file = os.path.join(
202 self.profile_dir.security_dir,
202 self.profile_dir.security_dir,
203 self.url_file_name
203 self.url_file_name
204 )
204 )
205
205
206 def load_connector_file(self):
206 def load_connector_file(self):
207 """load config from a JSON connector file,
207 """load config from a JSON connector file,
208 at a *lower* priority than command-line/config files.
208 at a *lower* priority than command-line/config files.
209 """
209 """
210
210
211 self.log.info("Loading url_file %r"%self.url_file)
211 self.log.info("Loading url_file %r"%self.url_file)
212 config = self.config
212 config = self.config
213
213
214 with open(self.url_file) as f:
214 with open(self.url_file) as f:
215 d = json.loads(f.read())
215 d = json.loads(f.read())
216
216
217 try:
217 try:
218 config.Session.key
218 config.Session.key
219 except AttributeError:
219 except AttributeError:
220 if d['exec_key']:
220 if d['exec_key']:
221 config.Session.key = asbytes(d['exec_key'])
221 config.Session.key = asbytes(d['exec_key'])
222
222
223 try:
223 try:
224 config.EngineFactory.location
224 config.EngineFactory.location
225 except AttributeError:
225 except AttributeError:
226 config.EngineFactory.location = d['location']
226 config.EngineFactory.location = d['location']
227
227
228 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
228 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
229 try:
229 try:
230 config.EngineFactory.url
230 config.EngineFactory.url
231 except AttributeError:
231 except AttributeError:
232 config.EngineFactory.url = d['url']
232 config.EngineFactory.url = d['url']
233
233
234 try:
234 try:
235 config.EngineFactory.sshserver
235 config.EngineFactory.sshserver
236 except AttributeError:
236 except AttributeError:
237 config.EngineFactory.sshserver = d['ssh']
237 config.EngineFactory.sshserver = d['ssh']
238
238
239 def init_engine(self):
239 def init_engine(self):
240 # This is the working dir by now.
240 # This is the working dir by now.
241 sys.path.insert(0, '')
241 sys.path.insert(0, '')
242 config = self.config
242 config = self.config
243 # print config
243 # print config
244 self.find_url_file()
244 self.find_url_file()
245
245
246 # was the url manually specified?
246 # was the url manually specified?
247 keys = set(self.config.EngineFactory.keys())
247 keys = set(self.config.EngineFactory.keys())
248 keys = keys.union(set(self.config.RegistrationFactory.keys()))
248 keys = keys.union(set(self.config.RegistrationFactory.keys()))
249
249
250 if keys.intersection(set(['ip', 'url', 'port'])):
250 if keys.intersection(set(['ip', 'url', 'port'])):
251 # Connection info was specified, don't wait for the file
251 # Connection info was specified, don't wait for the file
252 url_specified = True
252 url_specified = True
253 self.wait_for_url_file = 0
253 self.wait_for_url_file = 0
254 else:
254 else:
255 url_specified = False
255 url_specified = False
256
256
257 if self.wait_for_url_file and not os.path.exists(self.url_file):
257 if self.wait_for_url_file and not os.path.exists(self.url_file):
258 self.log.warn("url_file %r not found"%self.url_file)
258 self.log.warn("url_file %r not found"%self.url_file)
259 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
259 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
260 tic = time.time()
260 tic = time.time()
261 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
261 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
262 # wait for url_file to exist, for up to 10 seconds
262 # wait for url_file to exist, for up to 10 seconds
263 time.sleep(0.1)
263 time.sleep(0.1)
264
264
265 if os.path.exists(self.url_file):
265 if os.path.exists(self.url_file):
266 self.load_connector_file()
266 self.load_connector_file()
267 elif not url_specified:
267 elif not url_specified:
268 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
268 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
269 self.exit(1)
269 self.exit(1)
270
270
271
271
272 try:
272 try:
273 exec_lines = config.Kernel.exec_lines
273 exec_lines = config.Kernel.exec_lines
274 except AttributeError:
274 except AttributeError:
275 config.Kernel.exec_lines = []
275 config.Kernel.exec_lines = []
276 exec_lines = config.Kernel.exec_lines
276 exec_lines = config.Kernel.exec_lines
277
277
278 if self.startup_script:
278 if self.startup_script:
279 enc = sys.getfilesystemencoding() or 'utf8'
279 enc = sys.getfilesystemencoding() or 'utf8'
280 cmd="execfile(%r)"%self.startup_script.encode(enc)
280 cmd="execfile(%r)"%self.startup_script.encode(enc)
281 exec_lines.append(cmd)
281 exec_lines.append(cmd)
282 if self.startup_command:
282 if self.startup_command:
283 exec_lines.append(self.startup_command)
283 exec_lines.append(self.startup_command)
284
284
285 # Create the underlying shell class and Engine
285 # Create the underlying shell class and Engine
286 # shell_class = import_item(self.master_config.Global.shell_class)
286 # shell_class = import_item(self.master_config.Global.shell_class)
287 # print self.config
287 # print self.config
288 try:
288 try:
289 self.engine = EngineFactory(config=config, log=self.log)
289 self.engine = EngineFactory(config=config, log=self.log)
290 except:
290 except:
291 self.log.error("Couldn't start the Engine", exc_info=True)
291 self.log.error("Couldn't start the Engine", exc_info=True)
292 self.exit(1)
292 self.exit(1)
293
293
294 def forward_logging(self):
294 def forward_logging(self):
295 if self.log_url:
295 if self.log_url:
296 self.log.info("Forwarding logging to %s"%self.log_url)
296 self.log.info("Forwarding logging to %s"%self.log_url)
297 context = self.engine.context
297 context = self.engine.context
298 lsock = context.socket(zmq.PUB)
298 lsock = context.socket(zmq.PUB)
299 lsock.connect(self.log_url)
299 lsock.connect(self.log_url)
300 self.log.removeHandler(self._log_handler)
300 self.log.removeHandler(self._log_handler)
301 handler = EnginePUBHandler(self.engine, lsock)
301 handler = EnginePUBHandler(self.engine, lsock)
302 handler.setLevel(self.log_level)
302 handler.setLevel(self.log_level)
303 self.log.addHandler(handler)
303 self.log.addHandler(handler)
304 self._log_handler = handler
304 self._log_handler = handler
305
305
306 def init_mpi(self):
306 def init_mpi(self):
307 global mpi
307 global mpi
308 self.mpi = MPI(config=self.config)
308 self.mpi = MPI(config=self.config)
309
309
310 mpi_import_statement = self.mpi.init_script
310 mpi_import_statement = self.mpi.init_script
311 if mpi_import_statement:
311 if mpi_import_statement:
312 try:
312 try:
313 self.log.info("Initializing MPI:")
313 self.log.info("Initializing MPI:")
314 self.log.info(mpi_import_statement)
314 self.log.info(mpi_import_statement)
315 exec mpi_import_statement in globals()
315 exec mpi_import_statement in globals()
316 except:
316 except:
317 mpi = None
317 mpi = None
318 else:
318 else:
319 mpi = None
319 mpi = None
320
320
321 def initialize(self, argv=None):
321 def initialize(self, argv=None):
322 super(IPEngineApp, self).initialize(argv)
322 super(IPEngineApp, self).initialize(argv)
323 self.init_mpi()
323 self.init_mpi()
324 self.init_engine()
324 self.init_engine()
325 self.forward_logging()
325 self.forward_logging()
326
326
327 def start(self):
327 def start(self):
328 self.engine.start()
328 self.engine.start()
329 try:
329 try:
330 self.engine.loop.start()
330 self.engine.loop.start()
331 except KeyboardInterrupt:
331 except KeyboardInterrupt:
332 self.log.critical("Engine Interrupted, shutting down...\n")
332 self.log.critical("Engine Interrupted, shutting down...\n")
333
333
334
334
335 def launch_new_instance():
335 def launch_new_instance():
336 """Create and run the IPython engine"""
336 """Create and run the IPython engine"""
337 app = IPEngineApp.instance()
337 app = IPEngineApp.instance()
338 app.initialize()
338 app.initialize()
339 app.start()
339 app.start()
340
340
341
341
342 if __name__ == '__main__':
342 if __name__ == '__main__':
343 launch_new_instance()
343 launch_new_instance()
344
344
@@ -1,1169 +1,1176 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import re
25 import re
26 import stat
26 import stat
27 import time
27 import time
28
28
29 # signal imports, handling various platforms, versions
29 # signal imports, handling various platforms, versions
30
30
31 from signal import SIGINT, SIGTERM
31 from signal import SIGINT, SIGTERM
32 try:
32 try:
33 from signal import SIGKILL
33 from signal import SIGKILL
34 except ImportError:
34 except ImportError:
35 # Windows
35 # Windows
36 SIGKILL=SIGTERM
36 SIGKILL=SIGTERM
37
37
38 try:
38 try:
39 # Windows >= 2.7, 3.2
39 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
40 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
41 except ImportError:
42 pass
42 pass
43
43
44 from subprocess import Popen, PIPE, STDOUT
44 from subprocess import Popen, PIPE, STDOUT
45 try:
45 try:
46 from subprocess import check_output
46 from subprocess import check_output
47 except ImportError:
47 except ImportError:
48 # pre-2.7, define check_output with Popen
48 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
49 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
50 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
51 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
52 out,err = p.communicate()
53 return out
53 return out
54
54
55 from zmq.eventloop import ioloop
55 from zmq.eventloop import ioloop
56
56
57 from IPython.config.application import Application
57 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import (
60 from IPython.utils.traitlets import (
61 Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
61 Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
62 )
63 from IPython.utils.path import get_ipython_module_path
63 from IPython.utils.path import get_ipython_module_path
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
65
65
66 from .win32support import forward_read_events
66 from .win32support import forward_read_events
67
67
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69
69
70 WINDOWS = os.name == 'nt'
70 WINDOWS = os.name == 'nt'
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Paths to the kernel apps
73 # Paths to the kernel apps
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 'IPython.parallel.apps.ipclusterapp'
78 'IPython.parallel.apps.ipclusterapp'
79 ))
79 ))
80
80
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 'IPython.parallel.apps.ipengineapp'
82 'IPython.parallel.apps.ipengineapp'
83 ))
83 ))
84
84
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 'IPython.parallel.apps.ipcontrollerapp'
86 'IPython.parallel.apps.ipcontrollerapp'
87 ))
87 ))
88
88
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90 # Base launchers and errors
90 # Base launchers and errors
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92
92
93
93
94 class LauncherError(Exception):
94 class LauncherError(Exception):
95 pass
95 pass
96
96
97
97
98 class ProcessStateError(LauncherError):
98 class ProcessStateError(LauncherError):
99 pass
99 pass
100
100
101
101
102 class UnknownStatus(LauncherError):
102 class UnknownStatus(LauncherError):
103 pass
103 pass
104
104
105
105
106 class BaseLauncher(LoggingConfigurable):
106 class BaseLauncher(LoggingConfigurable):
107 """An asbtraction for starting, stopping and signaling a process."""
107 """An asbtraction for starting, stopping and signaling a process."""
108
108
109 # In all of the launchers, the work_dir is where child processes will be
109 # In all of the launchers, the work_dir is where child processes will be
110 # run. This will usually be the profile_dir, but may not be. any work_dir
110 # run. This will usually be the profile_dir, but may not be. any work_dir
111 # passed into the __init__ method will override the config value.
111 # passed into the __init__ method will override the config value.
112 # This should not be used to set the work_dir for the actual engine
112 # This should not be used to set the work_dir for the actual engine
113 # and controller. Instead, use their own config files or the
113 # and controller. Instead, use their own config files or the
114 # controller_args, engine_args attributes of the launchers to add
114 # controller_args, engine_args attributes of the launchers to add
115 # the work_dir option.
115 # the work_dir option.
116 work_dir = Unicode(u'.')
116 work_dir = Unicode(u'.')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118
118
119 start_data = Any()
119 start_data = Any()
120 stop_data = Any()
120 stop_data = Any()
121
121
122 def _loop_default(self):
122 def _loop_default(self):
123 return ioloop.IOLoop.instance()
123 return ioloop.IOLoop.instance()
124
124
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 self.state = 'before' # can be before, running, after
127 self.state = 'before' # can be before, running, after
128 self.stop_callbacks = []
128 self.stop_callbacks = []
129 self.start_data = None
129 self.start_data = None
130 self.stop_data = None
130 self.stop_data = None
131
131
132 @property
132 @property
133 def args(self):
133 def args(self):
134 """A list of cmd and args that will be used to start the process.
134 """A list of cmd and args that will be used to start the process.
135
135
136 This is what is passed to :func:`spawnProcess` and the first element
136 This is what is passed to :func:`spawnProcess` and the first element
137 will be the process name.
137 will be the process name.
138 """
138 """
139 return self.find_args()
139 return self.find_args()
140
140
141 def find_args(self):
141 def find_args(self):
142 """The ``.args`` property calls this to find the args list.
142 """The ``.args`` property calls this to find the args list.
143
143
144 Subcommand should implement this to construct the cmd and args.
144 Subcommand should implement this to construct the cmd and args.
145 """
145 """
146 raise NotImplementedError('find_args must be implemented in a subclass')
146 raise NotImplementedError('find_args must be implemented in a subclass')
147
147
148 @property
148 @property
149 def arg_str(self):
149 def arg_str(self):
150 """The string form of the program arguments."""
150 """The string form of the program arguments."""
151 return ' '.join(self.args)
151 return ' '.join(self.args)
152
152
153 @property
153 @property
154 def running(self):
154 def running(self):
155 """Am I running."""
155 """Am I running."""
156 if self.state == 'running':
156 if self.state == 'running':
157 return True
157 return True
158 else:
158 else:
159 return False
159 return False
160
160
161 def start(self):
161 def start(self):
162 """Start the process."""
162 """Start the process."""
163 raise NotImplementedError('start must be implemented in a subclass')
163 raise NotImplementedError('start must be implemented in a subclass')
164
164
165 def stop(self):
165 def stop(self):
166 """Stop the process and notify observers of stopping.
166 """Stop the process and notify observers of stopping.
167
167
168 This method will return None immediately.
168 This method will return None immediately.
169 To observe the actual process stopping, see :meth:`on_stop`.
169 To observe the actual process stopping, see :meth:`on_stop`.
170 """
170 """
171 raise NotImplementedError('stop must be implemented in a subclass')
171 raise NotImplementedError('stop must be implemented in a subclass')
172
172
173 def on_stop(self, f):
173 def on_stop(self, f):
174 """Register a callback to be called with this Launcher's stop_data
174 """Register a callback to be called with this Launcher's stop_data
175 when the process actually finishes.
175 when the process actually finishes.
176 """
176 """
177 if self.state=='after':
177 if self.state=='after':
178 return f(self.stop_data)
178 return f(self.stop_data)
179 else:
179 else:
180 self.stop_callbacks.append(f)
180 self.stop_callbacks.append(f)
181
181
182 def notify_start(self, data):
182 def notify_start(self, data):
183 """Call this to trigger startup actions.
183 """Call this to trigger startup actions.
184
184
185 This logs the process startup and sets the state to 'running'. It is
185 This logs the process startup and sets the state to 'running'. It is
186 a pass-through so it can be used as a callback.
186 a pass-through so it can be used as a callback.
187 """
187 """
188
188
189 self.log.info('Process %r started: %r' % (self.args[0], data))
189 self.log.info('Process %r started: %r' % (self.args[0], data))
190 self.start_data = data
190 self.start_data = data
191 self.state = 'running'
191 self.state = 'running'
192 return data
192 return data
193
193
194 def notify_stop(self, data):
194 def notify_stop(self, data):
195 """Call this to trigger process stop actions.
195 """Call this to trigger process stop actions.
196
196
197 This logs the process stopping and sets the state to 'after'. Call
197 This logs the process stopping and sets the state to 'after'. Call
198 this to trigger callbacks registered via :meth:`on_stop`."""
198 this to trigger callbacks registered via :meth:`on_stop`."""
199
199
200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
201 self.stop_data = data
201 self.stop_data = data
202 self.state = 'after'
202 self.state = 'after'
203 for i in range(len(self.stop_callbacks)):
203 for i in range(len(self.stop_callbacks)):
204 d = self.stop_callbacks.pop()
204 d = self.stop_callbacks.pop()
205 d(data)
205 d(data)
206 return data
206 return data
207
207
208 def signal(self, sig):
208 def signal(self, sig):
209 """Signal the process.
209 """Signal the process.
210
210
211 Parameters
211 Parameters
212 ----------
212 ----------
213 sig : str or int
213 sig : str or int
214 'KILL', 'INT', etc., or any signal number
214 'KILL', 'INT', etc., or any signal number
215 """
215 """
216 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
217
217
218 class ClusterAppMixin(HasTraits):
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
220 cluster_args = List([])
221 profile_dir=Unicode('')
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
222 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
224 self.cluster_args = []
225 if self.profile_dir:
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
229 _cluster_id_changed = _profile_dir_changed
230
230
231 class ControllerMixin(ClusterAppMixin):
231 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
233 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
236 help="""command-line args to pass to ipcontroller""")
237
237
238 class EngineMixin(ClusterAppMixin):
238 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
240 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
243 help="command-line arguments to pass to ipengine"
244 )
244 )
245
245
246 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
247 # Local process launchers
247 # Local process launchers
248 #-----------------------------------------------------------------------------
248 #-----------------------------------------------------------------------------
249
249
250
250
251 class LocalProcessLauncher(BaseLauncher):
251 class LocalProcessLauncher(BaseLauncher):
252 """Start and stop an external process in an asynchronous manner.
252 """Start and stop an external process in an asynchronous manner.
253
253
254 This will launch the external process with a working directory of
254 This will launch the external process with a working directory of
255 ``self.work_dir``.
255 ``self.work_dir``.
256 """
256 """
257
257
258 # This is used to to construct self.args, which is passed to
258 # This is used to to construct self.args, which is passed to
259 # spawnProcess.
259 # spawnProcess.
260 cmd_and_args = List([])
260 cmd_and_args = List([])
261 poll_frequency = Int(100) # in ms
261 poll_frequency = Int(100) # in ms
262
262
263 def __init__(self, work_dir=u'.', config=None, **kwargs):
263 def __init__(self, work_dir=u'.', config=None, **kwargs):
264 super(LocalProcessLauncher, self).__init__(
264 super(LocalProcessLauncher, self).__init__(
265 work_dir=work_dir, config=config, **kwargs
265 work_dir=work_dir, config=config, **kwargs
266 )
266 )
267 self.process = None
267 self.process = None
268 self.poller = None
268 self.poller = None
269
269
270 def find_args(self):
270 def find_args(self):
271 return self.cmd_and_args
271 return self.cmd_and_args
272
272
273 def start(self):
273 def start(self):
274 if self.state == 'before':
274 if self.state == 'before':
275 self.process = Popen(self.args,
275 self.process = Popen(self.args,
276 stdout=PIPE,stderr=PIPE,stdin=PIPE,
276 stdout=PIPE,stderr=PIPE,stdin=PIPE,
277 env=os.environ,
277 env=os.environ,
278 cwd=self.work_dir
278 cwd=self.work_dir
279 )
279 )
280 if WINDOWS:
280 if WINDOWS:
281 self.stdout = forward_read_events(self.process.stdout)
281 self.stdout = forward_read_events(self.process.stdout)
282 self.stderr = forward_read_events(self.process.stderr)
282 self.stderr = forward_read_events(self.process.stderr)
283 else:
283 else:
284 self.stdout = self.process.stdout.fileno()
284 self.stdout = self.process.stdout.fileno()
285 self.stderr = self.process.stderr.fileno()
285 self.stderr = self.process.stderr.fileno()
286 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
286 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
287 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
287 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
288 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
288 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
289 self.poller.start()
289 self.poller.start()
290 self.notify_start(self.process.pid)
290 self.notify_start(self.process.pid)
291 else:
291 else:
292 s = 'The process was already started and has state: %r' % self.state
292 s = 'The process was already started and has state: %r' % self.state
293 raise ProcessStateError(s)
293 raise ProcessStateError(s)
294
294
295 def stop(self):
295 def stop(self):
296 return self.interrupt_then_kill()
296 return self.interrupt_then_kill()
297
297
298 def signal(self, sig):
298 def signal(self, sig):
299 if self.state == 'running':
299 if self.state == 'running':
300 if WINDOWS and sig != SIGINT:
300 if WINDOWS and sig != SIGINT:
301 # use Windows tree-kill for better child cleanup
301 # use Windows tree-kill for better child cleanup
302 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
302 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
303 else:
303 else:
304 self.process.send_signal(sig)
304 self.process.send_signal(sig)
305
305
306 def interrupt_then_kill(self, delay=2.0):
306 def interrupt_then_kill(self, delay=2.0):
307 """Send INT, wait a delay and then send KILL."""
307 """Send INT, wait a delay and then send KILL."""
308 try:
308 try:
309 self.signal(SIGINT)
309 self.signal(SIGINT)
310 except Exception:
310 except Exception:
311 self.log.debug("interrupt failed")
311 self.log.debug("interrupt failed")
312 pass
312 pass
313 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
313 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
314 self.killer.start()
314 self.killer.start()
315
315
316 # callbacks, etc:
316 # callbacks, etc:
317
317
318 def handle_stdout(self, fd, events):
318 def handle_stdout(self, fd, events):
319 if WINDOWS:
319 if WINDOWS:
320 line = self.stdout.recv()
320 line = self.stdout.recv()
321 else:
321 else:
322 line = self.process.stdout.readline()
322 line = self.process.stdout.readline()
323 # a stopped process will be readable but return empty strings
323 # a stopped process will be readable but return empty strings
324 if line:
324 if line:
325 self.log.info(line[:-1])
325 self.log.info(line[:-1])
326 else:
326 else:
327 self.poll()
327 self.poll()
328
328
329 def handle_stderr(self, fd, events):
329 def handle_stderr(self, fd, events):
330 if WINDOWS:
330 if WINDOWS:
331 line = self.stderr.recv()
331 line = self.stderr.recv()
332 else:
332 else:
333 line = self.process.stderr.readline()
333 line = self.process.stderr.readline()
334 # a stopped process will be readable but return empty strings
334 # a stopped process will be readable but return empty strings
335 if line:
335 if line:
336 self.log.error(line[:-1])
336 self.log.error(line[:-1])
337 else:
337 else:
338 self.poll()
338 self.poll()
339
339
340 def poll(self):
340 def poll(self):
341 status = self.process.poll()
341 status = self.process.poll()
342 if status is not None:
342 if status is not None:
343 self.poller.stop()
343 self.poller.stop()
344 self.loop.remove_handler(self.stdout)
344 self.loop.remove_handler(self.stdout)
345 self.loop.remove_handler(self.stderr)
345 self.loop.remove_handler(self.stderr)
346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
347 return status
347 return status
348
348
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
350 """Launch a controller as a regular external process."""
350 """Launch a controller as a regular external process."""
351
351
352 def find_args(self):
352 def find_args(self):
353 return self.controller_cmd + self.cluster_args + self.controller_args
353 return self.controller_cmd + self.cluster_args + self.controller_args
354
354
355 def start(self):
355 def start(self):
356 """Start the controller by profile_dir."""
356 """Start the controller by profile_dir."""
357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
358 return super(LocalControllerLauncher, self).start()
358 return super(LocalControllerLauncher, self).start()
359
359
360
360
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
362 """Launch a single engine as a regular externall process."""
362 """Launch a single engine as a regular externall process."""
363
363
364 def find_args(self):
364 def find_args(self):
365 return self.engine_cmd + self.cluster_args + self.engine_args
365 return self.engine_cmd + self.cluster_args + self.engine_args
366
366
367
367
368 class LocalEngineSetLauncher(LocalEngineLauncher):
368 class LocalEngineSetLauncher(LocalEngineLauncher):
369 """Launch a set of engines as regular external processes."""
369 """Launch a set of engines as regular external processes."""
370
370
371 delay = CFloat(0.1, config=True,
371 delay = CFloat(0.1, config=True,
372 help="""delay (in seconds) between starting each engine after the first.
372 help="""delay (in seconds) between starting each engine after the first.
373 This can help force the engines to get their ids in order, or limit
373 This can help force the engines to get their ids in order, or limit
374 process flood when starting many engines."""
374 process flood when starting many engines."""
375 )
375 )
376
376
377 # launcher class
377 # launcher class
378 launcher_class = LocalEngineLauncher
378 launcher_class = LocalEngineLauncher
379
379
380 launchers = Dict()
380 launchers = Dict()
381 stop_data = Dict()
381 stop_data = Dict()
382
382
383 def __init__(self, work_dir=u'.', config=None, **kwargs):
383 def __init__(self, work_dir=u'.', config=None, **kwargs):
384 super(LocalEngineSetLauncher, self).__init__(
384 super(LocalEngineSetLauncher, self).__init__(
385 work_dir=work_dir, config=config, **kwargs
385 work_dir=work_dir, config=config, **kwargs
386 )
386 )
387 self.stop_data = {}
387 self.stop_data = {}
388
388
389 def start(self, n):
389 def start(self, n):
390 """Start n engines by profile or profile_dir."""
390 """Start n engines by profile or profile_dir."""
391 dlist = []
391 dlist = []
392 for i in range(n):
392 for i in range(n):
393 if i > 0:
393 if i > 0:
394 time.sleep(self.delay)
394 time.sleep(self.delay)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
397 )
398
398
399 # Copy the engine args over to each engine launcher.
399 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
401 el.engine_args = copy.deepcopy(self.engine_args)
401 el.engine_args = copy.deepcopy(self.engine_args)
402 el.on_stop(self._notice_engine_stopped)
402 el.on_stop(self._notice_engine_stopped)
403 d = el.start()
403 d = el.start()
404 if i==0:
404 if i==0:
405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
406 self.launchers[i] = el
406 self.launchers[i] = el
407 dlist.append(d)
407 dlist.append(d)
408 self.notify_start(dlist)
408 self.notify_start(dlist)
409 return dlist
409 return dlist
410
410
411 def find_args(self):
411 def find_args(self):
412 return ['engine set']
412 return ['engine set']
413
413
414 def signal(self, sig):
414 def signal(self, sig):
415 dlist = []
415 dlist = []
416 for el in self.launchers.itervalues():
416 for el in self.launchers.itervalues():
417 d = el.signal(sig)
417 d = el.signal(sig)
418 dlist.append(d)
418 dlist.append(d)
419 return dlist
419 return dlist
420
420
421 def interrupt_then_kill(self, delay=1.0):
421 def interrupt_then_kill(self, delay=1.0):
422 dlist = []
422 dlist = []
423 for el in self.launchers.itervalues():
423 for el in self.launchers.itervalues():
424 d = el.interrupt_then_kill(delay)
424 d = el.interrupt_then_kill(delay)
425 dlist.append(d)
425 dlist.append(d)
426 return dlist
426 return dlist
427
427
428 def stop(self):
428 def stop(self):
429 return self.interrupt_then_kill()
429 return self.interrupt_then_kill()
430
430
431 def _notice_engine_stopped(self, data):
431 def _notice_engine_stopped(self, data):
432 pid = data['pid']
432 pid = data['pid']
433 for idx,el in self.launchers.iteritems():
433 for idx,el in self.launchers.iteritems():
434 if el.process.pid == pid:
434 if el.process.pid == pid:
435 break
435 break
436 self.launchers.pop(idx)
436 self.launchers.pop(idx)
437 self.stop_data[idx] = data
437 self.stop_data[idx] = data
438 if not self.launchers:
438 if not self.launchers:
439 self.notify_stop(self.stop_data)
439 self.notify_stop(self.stop_data)
440
440
441
441
442 #-----------------------------------------------------------------------------
442 #-----------------------------------------------------------------------------
443 # MPIExec launchers
443 # MPIExec launchers
444 #-----------------------------------------------------------------------------
444 #-----------------------------------------------------------------------------
445
445
446
446
447 class MPIExecLauncher(LocalProcessLauncher):
447 class MPIExecLauncher(LocalProcessLauncher):
448 """Launch an external process using mpiexec."""
448 """Launch an external process using mpiexec."""
449
449
450 mpi_cmd = List(['mpiexec'], config=True,
450 mpi_cmd = List(['mpiexec'], config=True,
451 help="The mpiexec command to use in starting the process."
451 help="The mpiexec command to use in starting the process."
452 )
452 )
453 mpi_args = List([], config=True,
453 mpi_args = List([], config=True,
454 help="The command line arguments to pass to mpiexec."
454 help="The command line arguments to pass to mpiexec."
455 )
455 )
456 program = List(['date'],
456 program = List(['date'],
457 help="The program to start via mpiexec.")
457 help="The program to start via mpiexec.")
458 program_args = List([],
458 program_args = List([],
459 help="The command line argument to the program."
459 help="The command line argument to the program."
460 )
460 )
461 n = Int(1)
461 n = Int(1)
462
462
463 def find_args(self):
463 def find_args(self):
464 """Build self.args using all the fields."""
464 """Build self.args using all the fields."""
465 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
466 self.program + self.program_args
466 self.program + self.program_args
467
467
468 def start(self, n):
468 def start(self, n):
469 """Start n instances of the program using mpiexec."""
469 """Start n instances of the program using mpiexec."""
470 self.n = n
470 self.n = n
471 return super(MPIExecLauncher, self).start()
471 return super(MPIExecLauncher, self).start()
472
472
473
473
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
475 """Launch a controller using mpiexec."""
475 """Launch a controller using mpiexec."""
476
476
477 # alias back to *non-configurable* program[_args] for use in find_args()
477 # alias back to *non-configurable* program[_args] for use in find_args()
478 # this way all Controller/EngineSetLaunchers have the same form, rather
478 # this way all Controller/EngineSetLaunchers have the same form, rather
479 # than *some* having `program_args` and others `controller_args`
479 # than *some* having `program_args` and others `controller_args`
480 @property
480 @property
481 def program(self):
481 def program(self):
482 return self.controller_cmd
482 return self.controller_cmd
483
483
484 @property
484 @property
485 def program_args(self):
485 def program_args(self):
486 return self.cluster_args + self.controller_args
486 return self.cluster_args + self.controller_args
487
487
488 def start(self):
488 def start(self):
489 """Start the controller by profile_dir."""
489 """Start the controller by profile_dir."""
490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
491 return super(MPIExecControllerLauncher, self).start(1)
491 return super(MPIExecControllerLauncher, self).start(1)
492
492
493
493
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 """Launch engines using mpiexec"""
495 """Launch engines using mpiexec"""
496
496
497 # alias back to *non-configurable* program[_args] for use in find_args()
497 # alias back to *non-configurable* program[_args] for use in find_args()
498 # this way all Controller/EngineSetLaunchers have the same form, rather
498 # this way all Controller/EngineSetLaunchers have the same form, rather
499 # than *some* having `program_args` and others `controller_args`
499 # than *some* having `program_args` and others `controller_args`
500 @property
500 @property
501 def program(self):
501 def program(self):
502 return self.engine_cmd
502 return self.engine_cmd
503
503
504 @property
504 @property
505 def program_args(self):
505 def program_args(self):
506 return self.cluster_args + self.engine_args
506 return self.cluster_args + self.engine_args
507
507
508 def start(self, n):
508 def start(self, n):
509 """Start n engines by profile or profile_dir."""
509 """Start n engines by profile or profile_dir."""
510 self.n = n
510 self.n = n
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 return super(MPIExecEngineSetLauncher, self).start(n)
512 return super(MPIExecEngineSetLauncher, self).start(n)
513
513
514 #-----------------------------------------------------------------------------
514 #-----------------------------------------------------------------------------
515 # SSH launchers
515 # SSH launchers
516 #-----------------------------------------------------------------------------
516 #-----------------------------------------------------------------------------
517
517
518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
519
519
520 class SSHLauncher(LocalProcessLauncher):
520 class SSHLauncher(LocalProcessLauncher):
521 """A minimal launcher for ssh.
521 """A minimal launcher for ssh.
522
522
523 To be useful this will probably have to be extended to use the ``sshx``
523 To be useful this will probably have to be extended to use the ``sshx``
524 idea for environment variables. There could be other things this needs
524 idea for environment variables. There could be other things this needs
525 as well.
525 as well.
526 """
526 """
527
527
528 ssh_cmd = List(['ssh'], config=True,
528 ssh_cmd = List(['ssh'], config=True,
529 help="command for starting ssh")
529 help="command for starting ssh")
530 ssh_args = List(['-tt'], config=True,
530 ssh_args = List(['-tt'], config=True,
531 help="args to pass to ssh")
531 help="args to pass to ssh")
532 program = List(['date'],
532 program = List(['date'],
533 help="Program to launch via ssh")
533 help="Program to launch via ssh")
534 program_args = List([],
534 program_args = List([],
535 help="args to pass to remote program")
535 help="args to pass to remote program")
536 hostname = Unicode('', config=True,
536 hostname = Unicode('', config=True,
537 help="hostname on which to launch the program")
537 help="hostname on which to launch the program")
538 user = Unicode('', config=True,
538 user = Unicode('', config=True,
539 help="username for ssh")
539 help="username for ssh")
540 location = Unicode('', config=True,
540 location = Unicode('', config=True,
541 help="user@hostname location for ssh in one setting")
541 help="user@hostname location for ssh in one setting")
542
542
543 def _hostname_changed(self, name, old, new):
543 def _hostname_changed(self, name, old, new):
544 if self.user:
544 if self.user:
545 self.location = u'%s@%s' % (self.user, new)
545 self.location = u'%s@%s' % (self.user, new)
546 else:
546 else:
547 self.location = new
547 self.location = new
548
548
549 def _user_changed(self, name, old, new):
549 def _user_changed(self, name, old, new):
550 self.location = u'%s@%s' % (new, self.hostname)
550 self.location = u'%s@%s' % (new, self.hostname)
551
551
552 def find_args(self):
552 def find_args(self):
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 self.program + self.program_args
554 self.program + self.program_args
555
555
556 def start(self, hostname=None, user=None):
556 def start(self, hostname=None, user=None):
557 if hostname is not None:
557 if hostname is not None:
558 self.hostname = hostname
558 self.hostname = hostname
559 if user is not None:
559 if user is not None:
560 self.user = user
560 self.user = user
561
561
562 return super(SSHLauncher, self).start()
562 return super(SSHLauncher, self).start()
563
563
564 def signal(self, sig):
564 def signal(self, sig):
565 if self.state == 'running':
565 if self.state == 'running':
566 # send escaped ssh connection-closer
566 # send escaped ssh connection-closer
567 self.process.stdin.write('~.')
567 self.process.stdin.write('~.')
568 self.process.stdin.flush()
568 self.process.stdin.flush()
569
569
570
570
571
571
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
573
573
574 # alias back to *non-configurable* program[_args] for use in find_args()
574 # alias back to *non-configurable* program[_args] for use in find_args()
575 # this way all Controller/EngineSetLaunchers have the same form, rather
575 # this way all Controller/EngineSetLaunchers have the same form, rather
576 # than *some* having `program_args` and others `controller_args`
576 # than *some* having `program_args` and others `controller_args`
577 @property
577 @property
578 def program(self):
578 def program(self):
579 return self.controller_cmd
579 return self.controller_cmd
580
580
581 @property
581 @property
582 def program_args(self):
582 def program_args(self):
583 return self.cluster_args + self.controller_args
583 return self.cluster_args + self.controller_args
584
584
585
585
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
587
587
588 # alias back to *non-configurable* program[_args] for use in find_args()
588 # alias back to *non-configurable* program[_args] for use in find_args()
589 # this way all Controller/EngineSetLaunchers have the same form, rather
589 # this way all Controller/EngineSetLaunchers have the same form, rather
590 # than *some* having `program_args` and others `controller_args`
590 # than *some* having `program_args` and others `controller_args`
591 @property
591 @property
592 def program(self):
592 def program(self):
593 return self.engine_cmd
593 return self.engine_cmd
594
594
595 @property
595 @property
596 def program_args(self):
596 def program_args(self):
597 return self.cluster_args + self.engine_args
597 return self.cluster_args + self.engine_args
598
598
599
599
600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
601 launcher_class = SSHEngineLauncher
601 launcher_class = SSHEngineLauncher
602 engines = Dict(config=True,
602 engines = Dict(config=True,
603 help="""dict of engines to launch. This is a dict by hostname of ints,
603 help="""dict of engines to launch. This is a dict by hostname of ints,
604 corresponding to the number of engines to start on that host.""")
604 corresponding to the number of engines to start on that host.""")
605
605
606 def start(self, n):
606 def start(self, n):
607 """Start engines by profile or profile_dir.
607 """Start engines by profile or profile_dir.
608 `n` is ignored, and the `engines` config property is used instead.
608 `n` is ignored, and the `engines` config property is used instead.
609 """
609 """
610
610
611 dlist = []
611 dlist = []
612 for host, n in self.engines.iteritems():
612 for host, n in self.engines.iteritems():
613 if isinstance(n, (tuple, list)):
613 if isinstance(n, (tuple, list)):
614 n, args = n
614 n, args = n
615 else:
615 else:
616 args = copy.deepcopy(self.engine_args)
616 args = copy.deepcopy(self.engine_args)
617
617
618 if '@' in host:
618 if '@' in host:
619 user,host = host.split('@',1)
619 user,host = host.split('@',1)
620 else:
620 else:
621 user=None
621 user=None
622 for i in range(n):
622 for i in range(n):
623 if i > 0:
623 if i > 0:
624 time.sleep(self.delay)
624 time.sleep(self.delay)
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 )
627 )
628
628
629 # Copy the engine args over to each engine launcher.
629 # Copy the engine args over to each engine launcher.
630 el.engine_cmd = self.engine_cmd
630 el.engine_cmd = self.engine_cmd
631 el.engine_args = args
631 el.engine_args = args
632 el.on_stop(self._notice_engine_stopped)
632 el.on_stop(self._notice_engine_stopped)
633 d = el.start(user=user, hostname=host)
633 d = el.start(user=user, hostname=host)
634 if i==0:
634 if i==0:
635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
636 self.launchers[host+str(i)] = el
636 self.launchers[host+str(i)] = el
637 dlist.append(d)
637 dlist.append(d)
638 self.notify_start(dlist)
638 self.notify_start(dlist)
639 return dlist
639 return dlist
640
640
641
641
642
642
643 #-----------------------------------------------------------------------------
643 #-----------------------------------------------------------------------------
644 # Windows HPC Server 2008 scheduler launchers
644 # Windows HPC Server 2008 scheduler launchers
645 #-----------------------------------------------------------------------------
645 #-----------------------------------------------------------------------------
646
646
647
647
648 # This is only used on Windows.
648 # This is only used on Windows.
649 def find_job_cmd():
649 def find_job_cmd():
650 if WINDOWS:
650 if WINDOWS:
651 try:
651 try:
652 return find_cmd('job')
652 return find_cmd('job')
653 except (FindCmdError, ImportError):
653 except (FindCmdError, ImportError):
654 # ImportError will be raised if win32api is not installed
654 # ImportError will be raised if win32api is not installed
655 return 'job'
655 return 'job'
656 else:
656 else:
657 return 'job'
657 return 'job'
658
658
659
659
660 class WindowsHPCLauncher(BaseLauncher):
660 class WindowsHPCLauncher(BaseLauncher):
661
661
662 job_id_regexp = Unicode(r'\d+', config=True,
662 job_id_regexp = Unicode(r'\d+', config=True,
663 help="""A regular expression used to get the job id from the output of the
663 help="""A regular expression used to get the job id from the output of the
664 submit_command. """
664 submit_command. """
665 )
665 )
666 job_file_name = Unicode(u'ipython_job.xml', config=True,
666 job_file_name = Unicode(u'ipython_job.xml', config=True,
667 help="The filename of the instantiated job script.")
667 help="The filename of the instantiated job script.")
668 # The full path to the instantiated job script. This gets made dynamically
668 # The full path to the instantiated job script. This gets made dynamically
669 # by combining the work_dir with the job_file_name.
669 # by combining the work_dir with the job_file_name.
670 job_file = Unicode(u'')
670 job_file = Unicode(u'')
671 scheduler = Unicode('', config=True,
671 scheduler = Unicode('', config=True,
672 help="The hostname of the scheduler to submit the job to.")
672 help="The hostname of the scheduler to submit the job to.")
673 job_cmd = Unicode(find_job_cmd(), config=True,
673 job_cmd = Unicode(find_job_cmd(), config=True,
674 help="The command for submitting jobs.")
674 help="The command for submitting jobs.")
675
675
676 def __init__(self, work_dir=u'.', config=None, **kwargs):
676 def __init__(self, work_dir=u'.', config=None, **kwargs):
677 super(WindowsHPCLauncher, self).__init__(
677 super(WindowsHPCLauncher, self).__init__(
678 work_dir=work_dir, config=config, **kwargs
678 work_dir=work_dir, config=config, **kwargs
679 )
679 )
680
680
681 @property
681 @property
682 def job_file(self):
682 def job_file(self):
683 return os.path.join(self.work_dir, self.job_file_name)
683 return os.path.join(self.work_dir, self.job_file_name)
684
684
685 def write_job_file(self, n):
685 def write_job_file(self, n):
686 raise NotImplementedError("Implement write_job_file in a subclass.")
686 raise NotImplementedError("Implement write_job_file in a subclass.")
687
687
688 def find_args(self):
688 def find_args(self):
689 return [u'job.exe']
689 return [u'job.exe']
690
690
691 def parse_job_id(self, output):
691 def parse_job_id(self, output):
692 """Take the output of the submit command and return the job id."""
692 """Take the output of the submit command and return the job id."""
693 m = re.search(self.job_id_regexp, output)
693 m = re.search(self.job_id_regexp, output)
694 if m is not None:
694 if m is not None:
695 job_id = m.group()
695 job_id = m.group()
696 else:
696 else:
697 raise LauncherError("Job id couldn't be determined: %s" % output)
697 raise LauncherError("Job id couldn't be determined: %s" % output)
698 self.job_id = job_id
698 self.job_id = job_id
699 self.log.info('Job started with job id: %r' % job_id)
699 self.log.info('Job started with job id: %r' % job_id)
700 return job_id
700 return job_id
701
701
702 def start(self, n):
702 def start(self, n):
703 """Start n copies of the process using the Win HPC job scheduler."""
703 """Start n copies of the process using the Win HPC job scheduler."""
704 self.write_job_file(n)
704 self.write_job_file(n)
705 args = [
705 args = [
706 'submit',
706 'submit',
707 '/jobfile:%s' % self.job_file,
707 '/jobfile:%s' % self.job_file,
708 '/scheduler:%s' % self.scheduler
708 '/scheduler:%s' % self.scheduler
709 ]
709 ]
710 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
710 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
711
711
712 output = check_output([self.job_cmd]+args,
712 output = check_output([self.job_cmd]+args,
713 env=os.environ,
713 env=os.environ,
714 cwd=self.work_dir,
714 cwd=self.work_dir,
715 stderr=STDOUT
715 stderr=STDOUT
716 )
716 )
717 job_id = self.parse_job_id(output)
717 job_id = self.parse_job_id(output)
718 self.notify_start(job_id)
718 self.notify_start(job_id)
719 return job_id
719 return job_id
720
720
721 def stop(self):
721 def stop(self):
722 args = [
722 args = [
723 'cancel',
723 'cancel',
724 self.job_id,
724 self.job_id,
725 '/scheduler:%s' % self.scheduler
725 '/scheduler:%s' % self.scheduler
726 ]
726 ]
727 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
727 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
728 try:
728 try:
729 output = check_output([self.job_cmd]+args,
729 output = check_output([self.job_cmd]+args,
730 env=os.environ,
730 env=os.environ,
731 cwd=self.work_dir,
731 cwd=self.work_dir,
732 stderr=STDOUT
732 stderr=STDOUT
733 )
733 )
734 except:
734 except:
735 output = 'The job already appears to be stoppped: %r' % self.job_id
735 output = 'The job already appears to be stoppped: %r' % self.job_id
736 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
736 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
737 return output
737 return output
738
738
739
739
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
741
741
742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
743 help="WinHPC xml job file.")
743 help="WinHPC xml job file.")
744 controller_args = List([], config=False,
744 controller_args = List([], config=False,
745 help="extra args to pass to ipcontroller")
745 help="extra args to pass to ipcontroller")
746
746
747 def write_job_file(self, n):
747 def write_job_file(self, n):
748 job = IPControllerJob(config=self.config)
748 job = IPControllerJob(config=self.config)
749
749
750 t = IPControllerTask(config=self.config)
750 t = IPControllerTask(config=self.config)
751 # The tasks work directory is *not* the actual work directory of
751 # The tasks work directory is *not* the actual work directory of
752 # the controller. It is used as the base path for the stdout/stderr
752 # the controller. It is used as the base path for the stdout/stderr
753 # files that the scheduler redirects to.
753 # files that the scheduler redirects to.
754 t.work_directory = self.profile_dir
754 t.work_directory = self.profile_dir
755 # Add the profile_dir and from self.start().
755 # Add the profile_dir and from self.start().
756 t.controller_args.extend(self.cluster_args)
756 t.controller_args.extend(self.cluster_args)
757 t.controller_args.extend(self.controller_args)
757 t.controller_args.extend(self.controller_args)
758 job.add_task(t)
758 job.add_task(t)
759
759
760 self.log.info("Writing job description file: %s" % self.job_file)
760 self.log.info("Writing job description file: %s" % self.job_file)
761 job.write(self.job_file)
761 job.write(self.job_file)
762
762
763 @property
763 @property
764 def job_file(self):
764 def job_file(self):
765 return os.path.join(self.profile_dir, self.job_file_name)
765 return os.path.join(self.profile_dir, self.job_file_name)
766
766
767 def start(self):
767 def start(self):
768 """Start the controller by profile_dir."""
768 """Start the controller by profile_dir."""
769 return super(WindowsHPCControllerLauncher, self).start(1)
769 return super(WindowsHPCControllerLauncher, self).start(1)
770
770
771
771
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
773
773
774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
775 help="jobfile for ipengines job")
775 help="jobfile for ipengines job")
776 engine_args = List([], config=False,
776 engine_args = List([], config=False,
777 help="extra args to pas to ipengine")
777 help="extra args to pas to ipengine")
778
778
779 def write_job_file(self, n):
779 def write_job_file(self, n):
780 job = IPEngineSetJob(config=self.config)
780 job = IPEngineSetJob(config=self.config)
781
781
782 for i in range(n):
782 for i in range(n):
783 t = IPEngineTask(config=self.config)
783 t = IPEngineTask(config=self.config)
784 # The tasks work directory is *not* the actual work directory of
784 # The tasks work directory is *not* the actual work directory of
785 # the engine. It is used as the base path for the stdout/stderr
785 # the engine. It is used as the base path for the stdout/stderr
786 # files that the scheduler redirects to.
786 # files that the scheduler redirects to.
787 t.work_directory = self.profile_dir
787 t.work_directory = self.profile_dir
788 # Add the profile_dir and from self.start().
788 # Add the profile_dir and from self.start().
789 t.controller_args.extend(self.cluster_args)
789 t.controller_args.extend(self.cluster_args)
790 t.controller_args.extend(self.engine_args)
790 t.controller_args.extend(self.engine_args)
791 job.add_task(t)
791 job.add_task(t)
792
792
793 self.log.info("Writing job description file: %s" % self.job_file)
793 self.log.info("Writing job description file: %s" % self.job_file)
794 job.write(self.job_file)
794 job.write(self.job_file)
795
795
796 @property
796 @property
797 def job_file(self):
797 def job_file(self):
798 return os.path.join(self.profile_dir, self.job_file_name)
798 return os.path.join(self.profile_dir, self.job_file_name)
799
799
800 def start(self, n):
800 def start(self, n):
801 """Start the controller by profile_dir."""
801 """Start the controller by profile_dir."""
802 return super(WindowsHPCEngineSetLauncher, self).start(n)
802 return super(WindowsHPCEngineSetLauncher, self).start(n)
803
803
804
804
805 #-----------------------------------------------------------------------------
805 #-----------------------------------------------------------------------------
806 # Batch (PBS) system launchers
806 # Batch (PBS) system launchers
807 #-----------------------------------------------------------------------------
807 #-----------------------------------------------------------------------------
808
808
809 class BatchClusterAppMixin(ClusterAppMixin):
809 class BatchClusterAppMixin(ClusterAppMixin):
810 """ClusterApp mixin that updates context dict, rather than args"""
810 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
811 context = Dict({'profile_dir':'', 'cluster_id':''})
812 def _profile_dir_changed(self, name, old, new):
811 def _profile_dir_changed(self, name, old, new):
813 self.context[name] = new
812 self.context[name] = new
814 _cluster_id_changed = _profile_dir_changed
813 _cluster_id_changed = _profile_dir_changed
815
814
815 def _profile_dir_default(self):
816 self.context['profile_dir'] = ''
817 return ''
818 def _cluster_id_default(self):
819 self.context['cluster_id'] = ''
820 return ''
821
822
816 class BatchSystemLauncher(BaseLauncher):
823 class BatchSystemLauncher(BaseLauncher):
817 """Launch an external process using a batch system.
824 """Launch an external process using a batch system.
818
825
819 This class is designed to work with UNIX batch systems like PBS, LSF,
826 This class is designed to work with UNIX batch systems like PBS, LSF,
820 GridEngine, etc. The overall model is that there are different commands
827 GridEngine, etc. The overall model is that there are different commands
821 like qsub, qdel, etc. that handle the starting and stopping of the process.
828 like qsub, qdel, etc. that handle the starting and stopping of the process.
822
829
823 This class also has the notion of a batch script. The ``batch_template``
830 This class also has the notion of a batch script. The ``batch_template``
824 attribute can be set to a string that is a template for the batch script.
831 attribute can be set to a string that is a template for the batch script.
825 This template is instantiated using string formatting. Thus the template can
832 This template is instantiated using string formatting. Thus the template can
826 use {n} fot the number of instances. Subclasses can add additional variables
833 use {n} fot the number of instances. Subclasses can add additional variables
827 to the template dict.
834 to the template dict.
828 """
835 """
829
836
830 # Subclasses must fill these in. See PBSEngineSet
837 # Subclasses must fill these in. See PBSEngineSet
831 submit_command = List([''], config=True,
838 submit_command = List([''], config=True,
832 help="The name of the command line program used to submit jobs.")
839 help="The name of the command line program used to submit jobs.")
833 delete_command = List([''], config=True,
840 delete_command = List([''], config=True,
834 help="The name of the command line program used to delete jobs.")
841 help="The name of the command line program used to delete jobs.")
835 job_id_regexp = Unicode('', config=True,
842 job_id_regexp = Unicode('', config=True,
836 help="""A regular expression used to get the job id from the output of the
843 help="""A regular expression used to get the job id from the output of the
837 submit_command.""")
844 submit_command.""")
838 batch_template = Unicode('', config=True,
845 batch_template = Unicode('', config=True,
839 help="The string that is the batch script template itself.")
846 help="The string that is the batch script template itself.")
840 batch_template_file = Unicode(u'', config=True,
847 batch_template_file = Unicode(u'', config=True,
841 help="The file that contains the batch template.")
848 help="The file that contains the batch template.")
842 batch_file_name = Unicode(u'batch_script', config=True,
849 batch_file_name = Unicode(u'batch_script', config=True,
843 help="The filename of the instantiated batch script.")
850 help="The filename of the instantiated batch script.")
844 queue = Unicode(u'', config=True,
851 queue = Unicode(u'', config=True,
845 help="The PBS Queue.")
852 help="The PBS Queue.")
846
853
847 def _queue_changed(self, name, old, new):
854 def _queue_changed(self, name, old, new):
848 self.context[name] = new
855 self.context[name] = new
849
856
850 n = Int(1)
857 n = Int(1)
851 _n_changed = _queue_changed
858 _n_changed = _queue_changed
852
859
853 # not configurable, override in subclasses
860 # not configurable, override in subclasses
854 # PBS Job Array regex
861 # PBS Job Array regex
855 job_array_regexp = Unicode('')
862 job_array_regexp = Unicode('')
856 job_array_template = Unicode('')
863 job_array_template = Unicode('')
857 # PBS Queue regex
864 # PBS Queue regex
858 queue_regexp = Unicode('')
865 queue_regexp = Unicode('')
859 queue_template = Unicode('')
866 queue_template = Unicode('')
860 # The default batch template, override in subclasses
867 # The default batch template, override in subclasses
861 default_template = Unicode('')
868 default_template = Unicode('')
862 # The full path to the instantiated batch script.
869 # The full path to the instantiated batch script.
863 batch_file = Unicode(u'')
870 batch_file = Unicode(u'')
864 # the format dict used with batch_template:
871 # the format dict used with batch_template:
865 context = Dict()
872 context = Dict()
866 # the Formatter instance for rendering the templates:
873 # the Formatter instance for rendering the templates:
867 formatter = Instance(EvalFormatter, (), {})
874 formatter = Instance(EvalFormatter, (), {})
868
875
869
876
870 def find_args(self):
877 def find_args(self):
871 return self.submit_command + [self.batch_file]
878 return self.submit_command + [self.batch_file]
872
879
873 def __init__(self, work_dir=u'.', config=None, **kwargs):
880 def __init__(self, work_dir=u'.', config=None, **kwargs):
874 super(BatchSystemLauncher, self).__init__(
881 super(BatchSystemLauncher, self).__init__(
875 work_dir=work_dir, config=config, **kwargs
882 work_dir=work_dir, config=config, **kwargs
876 )
883 )
877 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
884 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
878
885
879 def parse_job_id(self, output):
886 def parse_job_id(self, output):
880 """Take the output of the submit command and return the job id."""
887 """Take the output of the submit command and return the job id."""
881 m = re.search(self.job_id_regexp, output)
888 m = re.search(self.job_id_regexp, output)
882 if m is not None:
889 if m is not None:
883 job_id = m.group()
890 job_id = m.group()
884 else:
891 else:
885 raise LauncherError("Job id couldn't be determined: %s" % output)
892 raise LauncherError("Job id couldn't be determined: %s" % output)
886 self.job_id = job_id
893 self.job_id = job_id
887 self.log.info('Job submitted with job id: %r' % job_id)
894 self.log.info('Job submitted with job id: %r' % job_id)
888 return job_id
895 return job_id
889
896
890 def write_batch_script(self, n):
897 def write_batch_script(self, n):
891 """Instantiate and write the batch script to the work_dir."""
898 """Instantiate and write the batch script to the work_dir."""
892 self.n = n
899 self.n = n
893 # first priority is batch_template if set
900 # first priority is batch_template if set
894 if self.batch_template_file and not self.batch_template:
901 if self.batch_template_file and not self.batch_template:
895 # second priority is batch_template_file
902 # second priority is batch_template_file
896 with open(self.batch_template_file) as f:
903 with open(self.batch_template_file) as f:
897 self.batch_template = f.read()
904 self.batch_template = f.read()
898 if not self.batch_template:
905 if not self.batch_template:
899 # third (last) priority is default_template
906 # third (last) priority is default_template
900 self.batch_template = self.default_template
907 self.batch_template = self.default_template
901
908
902 # add jobarray or queue lines to user-specified template
909 # add jobarray or queue lines to user-specified template
903 # note that this is *only* when user did not specify a template.
910 # note that this is *only* when user did not specify a template.
904 regex = re.compile(self.job_array_regexp)
911 regex = re.compile(self.job_array_regexp)
905 # print regex.search(self.batch_template)
912 # print regex.search(self.batch_template)
906 if not regex.search(self.batch_template):
913 if not regex.search(self.batch_template):
907 self.log.info("adding job array settings to batch script")
914 self.log.info("adding job array settings to batch script")
908 firstline, rest = self.batch_template.split('\n',1)
915 firstline, rest = self.batch_template.split('\n',1)
909 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
916 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
910
917
911 regex = re.compile(self.queue_regexp)
918 regex = re.compile(self.queue_regexp)
912 # print regex.search(self.batch_template)
919 # print regex.search(self.batch_template)
913 if self.queue and not regex.search(self.batch_template):
920 if self.queue and not regex.search(self.batch_template):
914 self.log.info("adding PBS queue settings to batch script")
921 self.log.info("adding PBS queue settings to batch script")
915 firstline, rest = self.batch_template.split('\n',1)
922 firstline, rest = self.batch_template.split('\n',1)
916 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
923 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
917
924
918 script_as_string = self.formatter.format(self.batch_template, **self.context)
925 script_as_string = self.formatter.format(self.batch_template, **self.context)
919 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
926 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
920
927
921 with open(self.batch_file, 'w') as f:
928 with open(self.batch_file, 'w') as f:
922 f.write(script_as_string)
929 f.write(script_as_string)
923 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
930 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
924
931
925 def start(self, n):
932 def start(self, n):
926 """Start n copies of the process using a batch system."""
933 """Start n copies of the process using a batch system."""
927 # Here we save profile_dir in the context so they
934 # Here we save profile_dir in the context so they
928 # can be used in the batch script template as {profile_dir}
935 # can be used in the batch script template as {profile_dir}
929 self.write_batch_script(n)
936 self.write_batch_script(n)
930 output = check_output(self.args, env=os.environ)
937 output = check_output(self.args, env=os.environ)
931
938
932 job_id = self.parse_job_id(output)
939 job_id = self.parse_job_id(output)
933 self.notify_start(job_id)
940 self.notify_start(job_id)
934 return job_id
941 return job_id
935
942
936 def stop(self):
943 def stop(self):
937 output = check_output(self.delete_command+[self.job_id], env=os.environ)
944 output = check_output(self.delete_command+[self.job_id], env=os.environ)
938 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
945 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
939 return output
946 return output
940
947
941
948
942 class PBSLauncher(BatchSystemLauncher):
949 class PBSLauncher(BatchSystemLauncher):
943 """A BatchSystemLauncher subclass for PBS."""
950 """A BatchSystemLauncher subclass for PBS."""
944
951
945 submit_command = List(['qsub'], config=True,
952 submit_command = List(['qsub'], config=True,
946 help="The PBS submit command ['qsub']")
953 help="The PBS submit command ['qsub']")
947 delete_command = List(['qdel'], config=True,
954 delete_command = List(['qdel'], config=True,
948 help="The PBS delete command ['qsub']")
955 help="The PBS delete command ['qsub']")
949 job_id_regexp = Unicode(r'\d+', config=True,
956 job_id_regexp = Unicode(r'\d+', config=True,
950 help="Regular expresion for identifying the job ID [r'\d+']")
957 help="Regular expresion for identifying the job ID [r'\d+']")
951
958
952 batch_file = Unicode(u'')
959 batch_file = Unicode(u'')
953 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
960 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
954 job_array_template = Unicode('#PBS -t 1-{n}')
961 job_array_template = Unicode('#PBS -t 1-{n}')
955 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
962 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
956 queue_template = Unicode('#PBS -q {queue}')
963 queue_template = Unicode('#PBS -q {queue}')
957
964
958
965
959 class PBSControllerLauncher(BatchClusterAppMixin, PBSLauncher):
966 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
960 """Launch a controller using PBS."""
967 """Launch a controller using PBS."""
961
968
962 batch_file_name = Unicode(u'pbs_controller', config=True,
969 batch_file_name = Unicode(u'pbs_controller', config=True,
963 help="batch file name for the controller job.")
970 help="batch file name for the controller job.")
964 default_template= Unicode("""#!/bin/sh
971 default_template= Unicode("""#!/bin/sh
965 #PBS -V
972 #PBS -V
966 #PBS -N ipcontroller
973 #PBS -N ipcontroller
967 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
974 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
968 """%(' '.join(ipcontroller_cmd_argv)))
975 """%(' '.join(ipcontroller_cmd_argv)))
969
976
970
977
971 def start(self):
978 def start(self):
972 """Start the controller by profile or profile_dir."""
979 """Start the controller by profile or profile_dir."""
973 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
980 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
974 return super(PBSControllerLauncher, self).start(1)
981 return super(PBSControllerLauncher, self).start(1)
975
982
976
983
977 class PBSEngineSetLauncher(BatchClusterAppMixin, PBSLauncher):
984 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
978 """Launch Engines using PBS"""
985 """Launch Engines using PBS"""
979 batch_file_name = Unicode(u'pbs_engines', config=True,
986 batch_file_name = Unicode(u'pbs_engines', config=True,
980 help="batch file name for the engine(s) job.")
987 help="batch file name for the engine(s) job.")
981 default_template= Unicode(u"""#!/bin/sh
988 default_template= Unicode(u"""#!/bin/sh
982 #PBS -V
989 #PBS -V
983 #PBS -N ipengine
990 #PBS -N ipengine
984 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
991 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
985 """%(' '.join(ipengine_cmd_argv)))
992 """%(' '.join(ipengine_cmd_argv)))
986
993
987 def start(self, n):
994 def start(self, n):
988 """Start n engines by profile or profile_dir."""
995 """Start n engines by profile or profile_dir."""
989 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
996 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
990 return super(PBSEngineSetLauncher, self).start(n)
997 return super(PBSEngineSetLauncher, self).start(n)
991
998
992 #SGE is very similar to PBS
999 #SGE is very similar to PBS
993
1000
994 class SGELauncher(PBSLauncher):
1001 class SGELauncher(PBSLauncher):
995 """Sun GridEngine is a PBS clone with slightly different syntax"""
1002 """Sun GridEngine is a PBS clone with slightly different syntax"""
996 job_array_regexp = Unicode('#\$\W+\-t')
1003 job_array_regexp = Unicode('#\$\W+\-t')
997 job_array_template = Unicode('#$ -t 1-{n}')
1004 job_array_template = Unicode('#$ -t 1-{n}')
998 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1005 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
999 queue_template = Unicode('#$ -q {queue}')
1006 queue_template = Unicode('#$ -q {queue}')
1000
1007
1001 class SGEControllerLauncher(BatchClusterAppMixin, SGELauncher):
1008 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1002 """Launch a controller using SGE."""
1009 """Launch a controller using SGE."""
1003
1010
1004 batch_file_name = Unicode(u'sge_controller', config=True,
1011 batch_file_name = Unicode(u'sge_controller', config=True,
1005 help="batch file name for the ipontroller job.")
1012 help="batch file name for the ipontroller job.")
1006 default_template= Unicode(u"""#$ -V
1013 default_template= Unicode(u"""#$ -V
1007 #$ -S /bin/sh
1014 #$ -S /bin/sh
1008 #$ -N ipcontroller
1015 #$ -N ipcontroller
1009 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1016 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1010 """%(' '.join(ipcontroller_cmd_argv)))
1017 """%(' '.join(ipcontroller_cmd_argv)))
1011
1018
1012 def start(self):
1019 def start(self):
1013 """Start the controller by profile or profile_dir."""
1020 """Start the controller by profile or profile_dir."""
1014 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
1021 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
1015 return super(SGEControllerLauncher, self).start(1)
1022 return super(SGEControllerLauncher, self).start(1)
1016
1023
1017 class SGEEngineSetLauncher(BatchClusterAppMixin, SGELauncher):
1024 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1018 """Launch Engines with SGE"""
1025 """Launch Engines with SGE"""
1019 batch_file_name = Unicode(u'sge_engines', config=True,
1026 batch_file_name = Unicode(u'sge_engines', config=True,
1020 help="batch file name for the engine(s) job.")
1027 help="batch file name for the engine(s) job.")
1021 default_template = Unicode("""#$ -V
1028 default_template = Unicode("""#$ -V
1022 #$ -S /bin/sh
1029 #$ -S /bin/sh
1023 #$ -N ipengine
1030 #$ -N ipengine
1024 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1031 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1025 """%(' '.join(ipengine_cmd_argv)))
1032 """%(' '.join(ipengine_cmd_argv)))
1026
1033
1027 def start(self, n):
1034 def start(self, n):
1028 """Start n engines by profile or profile_dir."""
1035 """Start n engines by profile or profile_dir."""
1029 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1036 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1030 return super(SGEEngineSetLauncher, self).start(n)
1037 return super(SGEEngineSetLauncher, self).start(n)
1031
1038
1032
1039
1033 # LSF launchers
1040 # LSF launchers
1034
1041
1035 class LSFLauncher(BatchSystemLauncher):
1042 class LSFLauncher(BatchSystemLauncher):
1036 """A BatchSystemLauncher subclass for LSF."""
1043 """A BatchSystemLauncher subclass for LSF."""
1037
1044
1038 submit_command = List(['bsub'], config=True,
1045 submit_command = List(['bsub'], config=True,
1039 help="The PBS submit command ['bsub']")
1046 help="The PBS submit command ['bsub']")
1040 delete_command = List(['bkill'], config=True,
1047 delete_command = List(['bkill'], config=True,
1041 help="The PBS delete command ['bkill']")
1048 help="The PBS delete command ['bkill']")
1042 job_id_regexp = Unicode(r'\d+', config=True,
1049 job_id_regexp = Unicode(r'\d+', config=True,
1043 help="Regular expresion for identifying the job ID [r'\d+']")
1050 help="Regular expresion for identifying the job ID [r'\d+']")
1044
1051
1045 batch_file = Unicode(u'')
1052 batch_file = Unicode(u'')
1046 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1053 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1047 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1054 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1048 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1055 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1049 queue_template = Unicode('#BSUB -q {queue}')
1056 queue_template = Unicode('#BSUB -q {queue}')
1050
1057
1051 def start(self, n):
1058 def start(self, n):
1052 """Start n copies of the process using LSF batch system.
1059 """Start n copies of the process using LSF batch system.
1053 This cant inherit from the base class because bsub expects
1060 This cant inherit from the base class because bsub expects
1054 to be piped a shell script in order to honor the #BSUB directives :
1061 to be piped a shell script in order to honor the #BSUB directives :
1055 bsub < script
1062 bsub < script
1056 """
1063 """
1057 # Here we save profile_dir in the context so they
1064 # Here we save profile_dir in the context so they
1058 # can be used in the batch script template as {profile_dir}
1065 # can be used in the batch script template as {profile_dir}
1059 self.write_batch_script(n)
1066 self.write_batch_script(n)
1060 #output = check_output(self.args, env=os.environ)
1067 #output = check_output(self.args, env=os.environ)
1061 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1068 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1062 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1069 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1063 output,err = p.communicate()
1070 output,err = p.communicate()
1064 job_id = self.parse_job_id(output)
1071 job_id = self.parse_job_id(output)
1065 self.notify_start(job_id)
1072 self.notify_start(job_id)
1066 return job_id
1073 return job_id
1067
1074
1068
1075
1069 class LSFControllerLauncher(BatchClusterAppMixin, LSFLauncher):
1076 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1070 """Launch a controller using LSF."""
1077 """Launch a controller using LSF."""
1071
1078
1072 batch_file_name = Unicode(u'lsf_controller', config=True,
1079 batch_file_name = Unicode(u'lsf_controller', config=True,
1073 help="batch file name for the controller job.")
1080 help="batch file name for the controller job.")
1074 default_template= Unicode("""#!/bin/sh
1081 default_template= Unicode("""#!/bin/sh
1075 #BSUB -J ipcontroller
1082 #BSUB -J ipcontroller
1076 #BSUB -oo ipcontroller.o.%%J
1083 #BSUB -oo ipcontroller.o.%%J
1077 #BSUB -eo ipcontroller.e.%%J
1084 #BSUB -eo ipcontroller.e.%%J
1078 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1085 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1079 """%(' '.join(ipcontroller_cmd_argv)))
1086 """%(' '.join(ipcontroller_cmd_argv)))
1080
1087
1081 def start(self):
1088 def start(self):
1082 """Start the controller by profile or profile_dir."""
1089 """Start the controller by profile or profile_dir."""
1083 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1090 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1084 return super(LSFControllerLauncher, self).start(1)
1091 return super(LSFControllerLauncher, self).start(1)
1085
1092
1086
1093
1087 class LSFEngineSetLauncher(BatchClusterAppMixin, LSFLauncher):
1094 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1088 """Launch Engines using LSF"""
1095 """Launch Engines using LSF"""
1089 batch_file_name = Unicode(u'lsf_engines', config=True,
1096 batch_file_name = Unicode(u'lsf_engines', config=True,
1090 help="batch file name for the engine(s) job.")
1097 help="batch file name for the engine(s) job.")
1091 default_template= Unicode(u"""#!/bin/sh
1098 default_template= Unicode(u"""#!/bin/sh
1092 #BSUB -oo ipengine.o.%%J
1099 #BSUB -oo ipengine.o.%%J
1093 #BSUB -eo ipengine.e.%%J
1100 #BSUB -eo ipengine.e.%%J
1094 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1101 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1095 """%(' '.join(ipengine_cmd_argv)))
1102 """%(' '.join(ipengine_cmd_argv)))
1096
1103
1097 def start(self, n):
1104 def start(self, n):
1098 """Start n engines by profile or profile_dir."""
1105 """Start n engines by profile or profile_dir."""
1099 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1106 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1100 return super(LSFEngineSetLauncher, self).start(n)
1107 return super(LSFEngineSetLauncher, self).start(n)
1101
1108
1102
1109
1103 #-----------------------------------------------------------------------------
1110 #-----------------------------------------------------------------------------
1104 # A launcher for ipcluster itself!
1111 # A launcher for ipcluster itself!
1105 #-----------------------------------------------------------------------------
1112 #-----------------------------------------------------------------------------
1106
1113
1107
1114
1108 class IPClusterLauncher(LocalProcessLauncher):
1115 class IPClusterLauncher(LocalProcessLauncher):
1109 """Launch the ipcluster program in an external process."""
1116 """Launch the ipcluster program in an external process."""
1110
1117
1111 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1118 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1112 help="Popen command for ipcluster")
1119 help="Popen command for ipcluster")
1113 ipcluster_args = List(
1120 ipcluster_args = List(
1114 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1121 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1115 help="Command line arguments to pass to ipcluster.")
1122 help="Command line arguments to pass to ipcluster.")
1116 ipcluster_subcommand = Unicode('start')
1123 ipcluster_subcommand = Unicode('start')
1117 ipcluster_n = Int(2)
1124 ipcluster_n = Int(2)
1118
1125
1119 def find_args(self):
1126 def find_args(self):
1120 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1127 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1121 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1128 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1122
1129
1123 def start(self):
1130 def start(self):
1124 self.log.info("Starting ipcluster: %r" % self.args)
1131 self.log.info("Starting ipcluster: %r" % self.args)
1125 return super(IPClusterLauncher, self).start()
1132 return super(IPClusterLauncher, self).start()
1126
1133
1127 #-----------------------------------------------------------------------------
1134 #-----------------------------------------------------------------------------
1128 # Collections of launchers
1135 # Collections of launchers
1129 #-----------------------------------------------------------------------------
1136 #-----------------------------------------------------------------------------
1130
1137
1131 local_launchers = [
1138 local_launchers = [
1132 LocalControllerLauncher,
1139 LocalControllerLauncher,
1133 LocalEngineLauncher,
1140 LocalEngineLauncher,
1134 LocalEngineSetLauncher,
1141 LocalEngineSetLauncher,
1135 ]
1142 ]
1136 mpi_launchers = [
1143 mpi_launchers = [
1137 MPIExecLauncher,
1144 MPIExecLauncher,
1138 MPIExecControllerLauncher,
1145 MPIExecControllerLauncher,
1139 MPIExecEngineSetLauncher,
1146 MPIExecEngineSetLauncher,
1140 ]
1147 ]
1141 ssh_launchers = [
1148 ssh_launchers = [
1142 SSHLauncher,
1149 SSHLauncher,
1143 SSHControllerLauncher,
1150 SSHControllerLauncher,
1144 SSHEngineLauncher,
1151 SSHEngineLauncher,
1145 SSHEngineSetLauncher,
1152 SSHEngineSetLauncher,
1146 ]
1153 ]
1147 winhpc_launchers = [
1154 winhpc_launchers = [
1148 WindowsHPCLauncher,
1155 WindowsHPCLauncher,
1149 WindowsHPCControllerLauncher,
1156 WindowsHPCControllerLauncher,
1150 WindowsHPCEngineSetLauncher,
1157 WindowsHPCEngineSetLauncher,
1151 ]
1158 ]
1152 pbs_launchers = [
1159 pbs_launchers = [
1153 PBSLauncher,
1160 PBSLauncher,
1154 PBSControllerLauncher,
1161 PBSControllerLauncher,
1155 PBSEngineSetLauncher,
1162 PBSEngineSetLauncher,
1156 ]
1163 ]
1157 sge_launchers = [
1164 sge_launchers = [
1158 SGELauncher,
1165 SGELauncher,
1159 SGEControllerLauncher,
1166 SGEControllerLauncher,
1160 SGEEngineSetLauncher,
1167 SGEEngineSetLauncher,
1161 ]
1168 ]
1162 lsf_launchers = [
1169 lsf_launchers = [
1163 LSFLauncher,
1170 LSFLauncher,
1164 LSFControllerLauncher,
1171 LSFControllerLauncher,
1165 LSFEngineSetLauncher,
1172 LSFEngineSetLauncher,
1166 ]
1173 ]
1167 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1174 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1168 + pbs_launchers + sge_launchers + lsf_launchers
1175 + pbs_launchers + sge_launchers + lsf_launchers
1169
1176
General Comments 0
You need to be logged in to leave comments. Login now