##// END OF EJS Templates
parallel.apps cleanup per review...
MinRK -
Show More
@@ -1,257 +1,261 b''
1 1 # encoding: utf-8
2 2 """
3 3 The Base Application class for IPython.parallel apps
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * Min RK
9 9
10 10 """
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Copyright (C) 2008-2011 The IPython Development Team
14 14 #
15 15 # Distributed under the terms of the BSD License. The full license is in
16 16 # the file COPYING, distributed as part of this software.
17 17 #-----------------------------------------------------------------------------
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 from __future__ import with_statement
24 24
25 25 import os
26 26 import logging
27 27 import re
28 28 import sys
29 29
30 30 from subprocess import Popen, PIPE
31 31
32 32 from IPython.core import release
33 33 from IPython.core.crashhandler import CrashHandler
34 34 from IPython.core.application import (
35 35 BaseIPythonApplication,
36 36 base_aliases as base_ip_aliases,
37 37 base_flags as base_ip_flags
38 38 )
39 39 from IPython.utils.path import expand_path
40 40
41 41 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Module errors
45 45 #-----------------------------------------------------------------------------
46 46
47 47 class PIDFileError(Exception):
48 48 pass
49 49
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Crash handler for this application
53 53 #-----------------------------------------------------------------------------
54 54
55 55 class ParallelCrashHandler(CrashHandler):
56 56 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
57 57
58 58 def __init__(self, app):
59 59 contact_name = release.authors['Min'][0]
60 60 contact_email = release.authors['Min'][1]
61 61 bug_tracker = 'http://github.com/ipython/ipython/issues'
62 62 super(ParallelCrashHandler,self).__init__(
63 63 app, contact_name, contact_email, bug_tracker
64 64 )
65 65
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # Main application
69 69 #-----------------------------------------------------------------------------
70 70 base_aliases = {}
71 71 base_aliases.update(base_ip_aliases)
72 72 base_aliases.update({
73 73 'profile-dir' : 'ProfileDir.location',
74 74 'work-dir' : 'BaseParallelApplication.work_dir',
75 75 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 76 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 77 'log-url' : 'BaseParallelApplication.log_url',
78 78 'cluster-id' : 'BaseParallelApplication.cluster_id',
79 79 })
80 80
81 81 base_flags = {
82 82 'log-to-file' : (
83 83 {'BaseParallelApplication' : {'log_to_file' : True}},
84 84 "send log output to a file"
85 85 )
86 86 }
87 87 base_flags.update(base_ip_flags)
88 88
89 89 class BaseParallelApplication(BaseIPythonApplication):
90 90 """The base Application for IPython.parallel apps
91 91
92 92 Principle extensions to BaseIPyythonApplication:
93 93
94 94 * work_dir
95 95 * remote logging via pyzmq
96 96 * IOLoop instance
97 97 """
98 98
99 99 crash_handler_class = ParallelCrashHandler
100 100
101 101 def _log_level_default(self):
102 102 # temporarily override default_log_level to INFO
103 103 return logging.INFO
104 104
105 105 work_dir = Unicode(os.getcwdu(), config=True,
106 106 help='Set the working dir for the process.'
107 107 )
108 108 def _work_dir_changed(self, name, old, new):
109 109 self.work_dir = unicode(expand_path(new))
110 110
111 111 log_to_file = Bool(config=True,
112 112 help="whether to log to a file")
113 113
114 114 clean_logs = Bool(False, config=True,
115 115 help="whether to cleanup old logfiles before starting")
116 116
117 117 log_url = Unicode('', config=True,
118 118 help="The ZMQ URL of the iplogger to aggregate logging.")
119 119
120 120 cluster_id = Unicode('', config=True,
121 121 help="""String id to add to runtime files, to prevent name collisions when
122 using multiple clusters with a single profile.
122 using multiple clusters with a single profile simultaneously.
123 123
124 124 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
125
126 Since this is text inserted into filenames, typical recommendations apply:
127 Simple character strings are ideal, and spaces are not recommended (but should
128 generally work).
125 129 """
126 130 )
127 131 def _cluster_id_changed(self, name, old, new):
128 132 self.name = self.__class__.name
129 133 if new:
130 134 self.name += '-%s'%new
131 135
132 136 def _config_files_default(self):
133 137 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
134 138
135 139 loop = Instance('zmq.eventloop.ioloop.IOLoop')
136 140 def _loop_default(self):
137 141 from zmq.eventloop.ioloop import IOLoop
138 142 return IOLoop.instance()
139 143
140 144 aliases = Dict(base_aliases)
141 145 flags = Dict(base_flags)
142 146
143 147 def initialize(self, argv=None):
144 148 """initialize the app"""
145 149 super(BaseParallelApplication, self).initialize(argv)
146 150 self.to_work_dir()
147 151 self.reinit_logging()
148 152
149 153 def to_work_dir(self):
150 154 wd = self.work_dir
151 155 if unicode(wd) != os.getcwdu():
152 156 os.chdir(wd)
153 157 self.log.info("Changing to working dir: %s" % wd)
154 158 # This is the working dir by now.
155 159 sys.path.insert(0, '')
156 160
157 161 def reinit_logging(self):
158 162 # Remove old log files
159 163 log_dir = self.profile_dir.log_dir
160 164 if self.clean_logs:
161 165 for f in os.listdir(log_dir):
162 166 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
163 167 os.remove(os.path.join(log_dir, f))
164 168 if self.log_to_file:
165 169 # Start logging to the new log file
166 170 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
167 171 logfile = os.path.join(log_dir, log_filename)
168 172 open_log_file = open(logfile, 'w')
169 173 else:
170 174 open_log_file = None
171 175 if open_log_file is not None:
172 176 self.log.removeHandler(self._log_handler)
173 177 self._log_handler = logging.StreamHandler(open_log_file)
174 178 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
175 179 self._log_handler.setFormatter(self._log_formatter)
176 180 self.log.addHandler(self._log_handler)
177 181 # do not propagate log messages to root logger
178 182 # ipcluster app will sometimes print duplicate messages during shutdown
179 183 # if this is 1 (default):
180 184 self.log.propagate = False
181 185
182 186 def write_pid_file(self, overwrite=False):
183 187 """Create a .pid file in the pid_dir with my pid.
184 188
185 189 This must be called after pre_construct, which sets `self.pid_dir`.
186 190 This raises :exc:`PIDFileError` if the pid file exists already.
187 191 """
188 192 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
189 193 if os.path.isfile(pid_file):
190 194 pid = self.get_pid_from_file()
191 195 if not overwrite:
192 196 raise PIDFileError(
193 197 'The pid file [%s] already exists. \nThis could mean that this '
194 198 'server is already running with [pid=%s].' % (pid_file, pid)
195 199 )
196 200 with open(pid_file, 'w') as f:
197 201 self.log.info("Creating pid file: %s" % pid_file)
198 202 f.write(repr(os.getpid())+'\n')
199 203
200 204 def remove_pid_file(self):
201 205 """Remove the pid file.
202 206
203 207 This should be called at shutdown by registering a callback with
204 208 :func:`reactor.addSystemEventTrigger`. This needs to return
205 209 ``None``.
206 210 """
207 211 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
208 212 if os.path.isfile(pid_file):
209 213 try:
210 214 self.log.info("Removing pid file: %s" % pid_file)
211 215 os.remove(pid_file)
212 216 except:
213 217 self.log.warn("Error removing the pid file: %s" % pid_file)
214 218
215 219 def get_pid_from_file(self):
216 220 """Get the pid from the pid file.
217 221
218 222 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
219 223 """
220 224 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
221 225 if os.path.isfile(pid_file):
222 226 with open(pid_file, 'r') as f:
223 227 s = f.read().strip()
224 228 try:
225 229 pid = int(s)
226 230 except:
227 231 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
228 232 return pid
229 233 else:
230 234 raise PIDFileError('pid file not found: %s' % pid_file)
231 235
232 236 def check_pid(self, pid):
233 237 if os.name == 'nt':
234 238 try:
235 239 import ctypes
236 240 # returns 0 if no such process (of ours) exists
237 241 # positive int otherwise
238 242 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
239 243 except Exception:
240 244 self.log.warn(
241 245 "Could not determine whether pid %i is running via `OpenProcess`. "
242 246 " Making the likely assumption that it is."%pid
243 247 )
244 248 return True
245 249 return bool(p)
246 250 else:
247 251 try:
248 252 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
249 253 output,_ = p.communicate()
250 254 except OSError:
251 255 self.log.warn(
252 256 "Could not determine whether pid %i is running via `ps x`. "
253 257 " Making the likely assumption that it is."%pid
254 258 )
255 259 return True
256 260 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
257 261 return pid in pids
@@ -1,452 +1,452 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import os
27 27 import socket
28 28 import stat
29 29 import sys
30 30 import uuid
31 31
32 32 from multiprocessing import Process
33 33
34 34 import zmq
35 35 from zmq.devices import ProcessMonitoredQueue
36 36 from zmq.log.handlers import PUBHandler
37 37 from zmq.utils import jsonapi as json
38 38
39 39 from IPython.config.application import boolean_flag
40 40 from IPython.core.profiledir import ProfileDir
41 41
42 42 from IPython.parallel.apps.baseapp import (
43 43 BaseParallelApplication,
44 44 base_aliases,
45 45 base_flags,
46 46 )
47 47 from IPython.utils.importstring import import_item
48 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
49 49
50 50 # from IPython.parallel.controller.controller import ControllerFactory
51 51 from IPython.zmq.session import Session
52 52 from IPython.parallel.controller.heartmonitor import HeartMonitor
53 53 from IPython.parallel.controller.hub import HubFactory
54 54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56 56
57 57 from IPython.parallel.util import signal_children, split_url, asbytes
58 58
59 59 # conditional import of MongoDB backend class
60 60
61 61 try:
62 62 from IPython.parallel.controller.mongodb import MongoDB
63 63 except ImportError:
64 64 maybe_mongo = []
65 65 else:
66 66 maybe_mongo = [MongoDB]
67 67
68 68
69 69 #-----------------------------------------------------------------------------
70 70 # Module level variables
71 71 #-----------------------------------------------------------------------------
72 72
73 73
74 74 #: The default config file name for this application
75 75 default_config_file_name = u'ipcontroller_config.py'
76 76
77 77
78 78 _description = """Start the IPython controller for parallel computing.
79 79
80 80 The IPython controller provides a gateway between the IPython engines and
81 81 clients. The controller needs to be started before the engines and can be
82 82 configured using command line options or using a cluster directory. Cluster
83 83 directories contain config, log and security files and are usually located in
84 84 your ipython directory and named as "profile_name". See the `profile`
85 85 and `profile-dir` options for details.
86 86 """
87 87
88 88 _examples = """
89 89 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
90 90 ipcontroller --scheme=pure # use the pure zeromq scheduler
91 91 """
92 92
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # The main application
96 96 #-----------------------------------------------------------------------------
97 97 flags = {}
98 98 flags.update(base_flags)
99 99 flags.update({
100 100 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
101 101 'Use threads instead of processes for the schedulers'),
102 102 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
103 103 'use the SQLiteDB backend'),
104 104 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
105 105 'use the MongoDB backend'),
106 106 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
107 107 'use the in-memory DictDB backend'),
108 108 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
109 109 'reuse existing json connection files')
110 110 })
111 111
112 112 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
113 113 "Use HMAC digests for authentication of messages.",
114 114 "Don't authenticate messages."
115 115 ))
116 116 aliases = dict(
117 117 secure = 'IPControllerApp.secure',
118 118 ssh = 'IPControllerApp.ssh_server',
119 119 enginessh = 'IPControllerApp.engine_ssh_server',
120 120 location = 'IPControllerApp.location',
121 121
122 122 ident = 'Session.session',
123 123 user = 'Session.username',
124 124 keyfile = 'Session.keyfile',
125 125
126 126 url = 'HubFactory.url',
127 127 ip = 'HubFactory.ip',
128 128 transport = 'HubFactory.transport',
129 129 port = 'HubFactory.regport',
130 130
131 131 ping = 'HeartMonitor.period',
132 132
133 133 scheme = 'TaskScheduler.scheme_name',
134 134 hwm = 'TaskScheduler.hwm',
135 135 )
136 136 aliases.update(base_aliases)
137 137
138 138
139 139 class IPControllerApp(BaseParallelApplication):
140 140
141 141 name = u'ipcontroller'
142 142 description = _description
143 143 examples = _examples
144 144 config_file_name = Unicode(default_config_file_name)
145 145 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
146 146
147 147 # change default to True
148 148 auto_create = Bool(True, config=True,
149 149 help="""Whether to create profile dir if it doesn't exist.""")
150 150
151 151 reuse_files = Bool(False, config=True,
152 152 help='Whether to reuse existing json connection files.'
153 153 )
154 154 secure = Bool(True, config=True,
155 155 help='Whether to use HMAC digests for extra message authentication.'
156 156 )
157 157 ssh_server = Unicode(u'', config=True,
158 158 help="""ssh url for clients to use when connecting to the Controller
159 159 processes. It should be of the form: [user@]server[:port]. The
160 160 Controller's listening addresses must be accessible from the ssh server""",
161 161 )
162 162 engine_ssh_server = Unicode(u'', config=True,
163 163 help="""ssh url for engines to use when connecting to the Controller
164 164 processes. It should be of the form: [user@]server[:port]. The
165 165 Controller's listening addresses must be accessible from the ssh server""",
166 166 )
167 167 location = Unicode(u'', config=True,
168 168 help="""The external IP or domain name of the Controller, used for disambiguating
169 169 engine and client connections.""",
170 170 )
171 171 import_statements = List([], config=True,
172 172 help="import statements to be run at startup. Necessary in some environments"
173 173 )
174 174
175 175 use_threads = Bool(False, config=True,
176 176 help='Use threads instead of processes for the schedulers',
177 177 )
178 178
179 179 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
180 180 help="JSON filename where engine connection info will be stored.")
181 181 client_json_file = Unicode('ipcontroller-client.json', config=True,
182 182 help="JSON filename where client connection info will be stored.")
183 183
184 184 def _cluster_id_changed(self, name, old, new):
185 185 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
186 self.engine_json_file = "%s-engine.json"%self.name
187 self.client_json_file = "%s-client.json"%self.name
186 self.engine_json_file = "%s-engine.json" % self.name
187 self.client_json_file = "%s-client.json" % self.name
188 188
189 189
190 190 # internal
191 191 children = List()
192 192 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
193 193
194 194 def _use_threads_changed(self, name, old, new):
195 195 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
196 196
197 197 aliases = Dict(aliases)
198 198 flags = Dict(flags)
199 199
200 200
201 201 def save_connection_dict(self, fname, cdict):
202 202 """save a connection dict to json file."""
203 203 c = self.config
204 204 url = cdict['url']
205 205 location = cdict['location']
206 206 if not location:
207 207 try:
208 208 proto,ip,port = split_url(url)
209 209 except AssertionError:
210 210 pass
211 211 else:
212 212 try:
213 213 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
214 214 except (socket.gaierror, IndexError):
215 215 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
216 216 " You may need to specify '--location=<external_ip_address>' to help"
217 217 " IPython decide when to connect via loopback.")
218 218 location = '127.0.0.1'
219 219 cdict['location'] = location
220 220 fname = os.path.join(self.profile_dir.security_dir, fname)
221 221 with open(fname, 'wb') as f:
222 222 f.write(json.dumps(cdict, indent=2))
223 223 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
224 224
225 225 def load_config_from_json(self):
226 226 """load config from existing json connector files."""
227 227 c = self.config
228 228 # load from engine config
229 229 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
230 230 cfg = json.loads(f.read())
231 231 key = c.Session.key = asbytes(cfg['exec_key'])
232 232 xport,addr = cfg['url'].split('://')
233 233 c.HubFactory.engine_transport = xport
234 234 ip,ports = addr.split(':')
235 235 c.HubFactory.engine_ip = ip
236 236 c.HubFactory.regport = int(ports)
237 237 self.location = cfg['location']
238 238 if not self.engine_ssh_server:
239 239 self.engine_ssh_server = cfg['ssh']
240 240 # load client config
241 241 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
242 242 cfg = json.loads(f.read())
243 243 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
244 244 xport,addr = cfg['url'].split('://')
245 245 c.HubFactory.client_transport = xport
246 246 ip,ports = addr.split(':')
247 247 c.HubFactory.client_ip = ip
248 248 if not self.ssh_server:
249 249 self.ssh_server = cfg['ssh']
250 250 assert int(ports) == c.HubFactory.regport, "regport mismatch"
251 251
252 252 def init_hub(self):
253 253 c = self.config
254 254
255 255 self.do_import_statements()
256 256 reusing = self.reuse_files
257 257 if reusing:
258 258 try:
259 259 self.load_config_from_json()
260 260 except (AssertionError,IOError):
261 261 reusing=False
262 262 # check again, because reusing may have failed:
263 263 if reusing:
264 264 pass
265 265 elif self.secure:
266 266 key = str(uuid.uuid4())
267 267 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
268 268 # with open(keyfile, 'w') as f:
269 269 # f.write(key)
270 270 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
271 271 c.Session.key = asbytes(key)
272 272 else:
273 273 key = c.Session.key = b''
274 274
275 275 try:
276 276 self.factory = HubFactory(config=c, log=self.log)
277 277 # self.start_logging()
278 278 self.factory.init_hub()
279 279 except:
280 280 self.log.error("Couldn't construct the Controller", exc_info=True)
281 281 self.exit(1)
282 282
283 283 if not reusing:
284 284 # save to new json config files
285 285 f = self.factory
286 286 cdict = {'exec_key' : key,
287 287 'ssh' : self.ssh_server,
288 288 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
289 289 'location' : self.location
290 290 }
291 291 self.save_connection_dict(self.client_json_file, cdict)
292 292 edict = cdict
293 293 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
294 294 edict['ssh'] = self.engine_ssh_server
295 295 self.save_connection_dict(self.engine_json_file, edict)
296 296
297 297 #
298 298 def init_schedulers(self):
299 299 children = self.children
300 300 mq = import_item(str(self.mq_class))
301 301
302 302 hub = self.factory
303 303 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
304 304 # IOPub relay (in a Process)
305 305 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
306 306 q.bind_in(hub.client_info['iopub'])
307 307 q.bind_out(hub.engine_info['iopub'])
308 308 q.setsockopt_out(zmq.SUBSCRIBE, b'')
309 309 q.connect_mon(hub.monitor_url)
310 310 q.daemon=True
311 311 children.append(q)
312 312
313 313 # Multiplexer Queue (in a Process)
314 314 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
315 315 q.bind_in(hub.client_info['mux'])
316 316 q.setsockopt_in(zmq.IDENTITY, b'mux')
317 317 q.bind_out(hub.engine_info['mux'])
318 318 q.connect_mon(hub.monitor_url)
319 319 q.daemon=True
320 320 children.append(q)
321 321
322 322 # Control Queue (in a Process)
323 323 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
324 324 q.bind_in(hub.client_info['control'])
325 325 q.setsockopt_in(zmq.IDENTITY, b'control')
326 326 q.bind_out(hub.engine_info['control'])
327 327 q.connect_mon(hub.monitor_url)
328 328 q.daemon=True
329 329 children.append(q)
330 330 try:
331 331 scheme = self.config.TaskScheduler.scheme_name
332 332 except AttributeError:
333 333 scheme = TaskScheduler.scheme_name.get_default_value()
334 334 # Task Queue (in a Process)
335 335 if scheme == 'pure':
336 336 self.log.warn("task::using pure XREQ Task scheduler")
337 337 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
338 338 # q.setsockopt_out(zmq.HWM, hub.hwm)
339 339 q.bind_in(hub.client_info['task'][1])
340 340 q.setsockopt_in(zmq.IDENTITY, b'task')
341 341 q.bind_out(hub.engine_info['task'])
342 342 q.connect_mon(hub.monitor_url)
343 343 q.daemon=True
344 344 children.append(q)
345 345 elif scheme == 'none':
346 346 self.log.warn("task::using no Task scheduler")
347 347
348 348 else:
349 349 self.log.info("task::using Python %s Task scheduler"%scheme)
350 350 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
351 351 hub.monitor_url, hub.client_info['notification'])
352 352 kwargs = dict(logname='scheduler', loglevel=self.log_level,
353 353 log_url = self.log_url, config=dict(self.config))
354 354 if 'Process' in self.mq_class:
355 355 # run the Python scheduler in a Process
356 356 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
357 357 q.daemon=True
358 358 children.append(q)
359 359 else:
360 360 # single-threaded Controller
361 361 kwargs['in_thread'] = True
362 362 launch_scheduler(*sargs, **kwargs)
363 363
364 364
365 365 def save_urls(self):
366 366 """save the registration urls to files."""
367 367 c = self.config
368 368
369 369 sec_dir = self.profile_dir.security_dir
370 370 cf = self.factory
371 371
372 372 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
373 373 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
374 374
375 375 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
376 376 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
377 377
378 378
379 379 def do_import_statements(self):
380 380 statements = self.import_statements
381 381 for s in statements:
382 382 try:
383 383 self.log.msg("Executing statement: '%s'" % s)
384 384 exec s in globals(), locals()
385 385 except:
386 386 self.log.msg("Error running statement: %s" % s)
387 387
388 388 def forward_logging(self):
389 389 if self.log_url:
390 390 self.log.info("Forwarding logging to %s"%self.log_url)
391 391 context = zmq.Context.instance()
392 392 lsock = context.socket(zmq.PUB)
393 393 lsock.connect(self.log_url)
394 394 handler = PUBHandler(lsock)
395 395 self.log.removeHandler(self._log_handler)
396 396 handler.root_topic = 'controller'
397 397 handler.setLevel(self.log_level)
398 398 self.log.addHandler(handler)
399 399 self._log_handler = handler
400 400 # #
401 401
402 402 def initialize(self, argv=None):
403 403 super(IPControllerApp, self).initialize(argv)
404 404 self.forward_logging()
405 405 self.init_hub()
406 406 self.init_schedulers()
407 407
408 408 def start(self):
409 409 # Start the subprocesses:
410 410 self.factory.start()
411 411 child_procs = []
412 412 for child in self.children:
413 413 child.start()
414 414 if isinstance(child, ProcessMonitoredQueue):
415 415 child_procs.append(child.launcher)
416 416 elif isinstance(child, Process):
417 417 child_procs.append(child)
418 418 if child_procs:
419 419 signal_children(child_procs)
420 420
421 421 self.write_pid_file(overwrite=True)
422 422
423 423 try:
424 424 self.factory.loop.start()
425 425 except KeyboardInterrupt:
426 426 self.log.critical("Interrupted, Exiting...\n")
427 427
428 428
429 429
430 430 def launch_new_instance():
431 431 """Create and run the IPython controller"""
432 432 if sys.platform == 'win32':
433 433 # make sure we don't get called from a multiprocessing subprocess
434 434 # this can result in infinite Controllers being started on Windows
435 435 # which doesn't have a proper fork, so multiprocessing is wonky
436 436
437 437 # this only comes up when IPython has been installed using vanilla
438 438 # setuptools, and *not* distribute.
439 439 import multiprocessing
440 440 p = multiprocessing.current_process()
441 441 # the main process has name 'MainProcess'
442 442 # subprocesses will have names like 'Process-1'
443 443 if p.name != 'MainProcess':
444 444 # we are a subprocess, don't start another Controller!
445 445 return
446 446 app = IPControllerApp.instance()
447 447 app.initialize()
448 448 app.start()
449 449
450 450
451 451 if __name__ == '__main__':
452 452 launch_new_instance()
@@ -1,344 +1,344 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 )
38 38 from IPython.zmq.log import EnginePUBHandler
39 39
40 40 from IPython.config.configurable import Configurable
41 41 from IPython.zmq.session import Session
42 42 from IPython.parallel.engine.engine import EngineFactory
43 43 from IPython.parallel.engine.streamkernel import Kernel
44 44 from IPython.parallel.util import disambiguate_url, asbytes
45 45
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Module level variables
52 52 #-----------------------------------------------------------------------------
53 53
54 54 #: The default config file name for this application
55 55 default_config_file_name = u'ipengine_config.py'
56 56
57 57 _description = """Start an IPython engine for parallel computing.
58 58
59 59 IPython engines run in parallel and perform computations on behalf of a client
60 60 and controller. A controller needs to be started before the engines. The
61 61 engine can be configured using command line options or using a cluster
62 62 directory. Cluster directories contain config, log and security files and are
63 63 usually located in your ipython directory and named as "profile_name".
64 64 See the `profile` and `profile-dir` options for details.
65 65 """
66 66
67 67 _examples = """
68 68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
69 69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
70 70 """
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # MPI configuration
74 74 #-----------------------------------------------------------------------------
75 75
76 76 mpi4py_init = """from mpi4py import MPI as mpi
77 77 mpi.size = mpi.COMM_WORLD.Get_size()
78 78 mpi.rank = mpi.COMM_WORLD.Get_rank()
79 79 """
80 80
81 81
82 82 pytrilinos_init = """from PyTrilinos import Epetra
83 83 class SimpleStruct:
84 84 pass
85 85 mpi = SimpleStruct()
86 86 mpi.rank = 0
87 87 mpi.size = 0
88 88 """
89 89
90 90 class MPI(Configurable):
91 91 """Configurable for MPI initialization"""
92 92 use = Unicode('', config=True,
93 93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
94 94 )
95 95
96 def _on_use_changed(self, old, new):
96 def _use_changed(self, name, old, new):
97 97 # load default init script if it's not set
98 98 if not self.init_script:
99 99 self.init_script = self.default_inits.get(new, '')
100 100
101 101 init_script = Unicode('', config=True,
102 102 help="Initialization code for MPI")
103 103
104 104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
105 105 config=True)
106 106
107 107
108 108 #-----------------------------------------------------------------------------
109 109 # Main application
110 110 #-----------------------------------------------------------------------------
111 111 aliases = dict(
112 112 file = 'IPEngineApp.url_file',
113 113 c = 'IPEngineApp.startup_command',
114 114 s = 'IPEngineApp.startup_script',
115 115
116 116 ident = 'Session.session',
117 117 user = 'Session.username',
118 118 keyfile = 'Session.keyfile',
119 119
120 120 url = 'EngineFactory.url',
121 121 ssh = 'EngineFactory.sshserver',
122 122 sshkey = 'EngineFactory.sshkey',
123 123 ip = 'EngineFactory.ip',
124 124 transport = 'EngineFactory.transport',
125 125 port = 'EngineFactory.regport',
126 126 location = 'EngineFactory.location',
127 127
128 128 timeout = 'EngineFactory.timeout',
129 129
130 130 mpi = 'MPI.use',
131 131
132 132 )
133 133 aliases.update(base_aliases)
134 134
135 135
136 136 class IPEngineApp(BaseParallelApplication):
137 137
138 138 name = 'ipengine'
139 139 description = _description
140 140 examples = _examples
141 141 config_file_name = Unicode(default_config_file_name)
142 142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
143 143
144 144 startup_script = Unicode(u'', config=True,
145 145 help='specify a script to be run at startup')
146 146 startup_command = Unicode('', config=True,
147 147 help='specify a command to be run at startup')
148 148
149 149 url_file = Unicode(u'', config=True,
150 150 help="""The full location of the file containing the connection information for
151 151 the controller. If this is not given, the file must be in the
152 152 security directory of the cluster directory. This location is
153 153 resolved using the `profile` or `profile_dir` options.""",
154 154 )
155 155 wait_for_url_file = Float(5, config=True,
156 156 help="""The maximum number of seconds to wait for url_file to exist.
157 157 This is useful for batch-systems and shared-filesystems where the
158 158 controller and engine are started at the same time and it
159 159 may take a moment for the controller to write the connector files.""")
160 160
161 161 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
162 162
163 163 def _cluster_id_changed(self, name, old, new):
164 164 if new:
165 base = 'ipcontroller-%s'%new
165 base = 'ipcontroller-%s' % new
166 166 else:
167 167 base = 'ipcontroller'
168 self.url_file_name = "%s-engine.json"%base
168 self.url_file_name = "%s-engine.json" % base
169 169
170 170 log_url = Unicode('', config=True,
171 171 help="""The URL for the iploggerapp instance, for forwarding
172 172 logging to a central location.""")
173 173
174 174 aliases = Dict(aliases)
175 175
176 176 # def find_key_file(self):
177 177 # """Set the key file.
178 178 #
179 179 # Here we don't try to actually see if it exists for is valid as that
180 180 # is hadled by the connection logic.
181 181 # """
182 182 # config = self.master_config
183 183 # # Find the actual controller key file
184 184 # if not config.Global.key_file:
185 185 # try_this = os.path.join(
186 186 # config.Global.profile_dir,
187 187 # config.Global.security_dir,
188 188 # config.Global.key_file_name
189 189 # )
190 190 # config.Global.key_file = try_this
191 191
192 192 def find_url_file(self):
193 193 """Set the url file.
194 194
195 195 Here we don't try to actually see if it exists for is valid as that
196 196 is hadled by the connection logic.
197 197 """
198 198 config = self.config
199 199 # Find the actual controller key file
200 200 if not self.url_file:
201 201 self.url_file = os.path.join(
202 202 self.profile_dir.security_dir,
203 203 self.url_file_name
204 204 )
205 205
206 206 def load_connector_file(self):
207 207 """load config from a JSON connector file,
208 208 at a *lower* priority than command-line/config files.
209 209 """
210 210
211 211 self.log.info("Loading url_file %r"%self.url_file)
212 212 config = self.config
213 213
214 214 with open(self.url_file) as f:
215 215 d = json.loads(f.read())
216 216
217 217 try:
218 218 config.Session.key
219 219 except AttributeError:
220 220 if d['exec_key']:
221 221 config.Session.key = asbytes(d['exec_key'])
222 222
223 223 try:
224 224 config.EngineFactory.location
225 225 except AttributeError:
226 226 config.EngineFactory.location = d['location']
227 227
228 228 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
229 229 try:
230 230 config.EngineFactory.url
231 231 except AttributeError:
232 232 config.EngineFactory.url = d['url']
233 233
234 234 try:
235 235 config.EngineFactory.sshserver
236 236 except AttributeError:
237 237 config.EngineFactory.sshserver = d['ssh']
238 238
239 239 def init_engine(self):
240 240 # This is the working dir by now.
241 241 sys.path.insert(0, '')
242 242 config = self.config
243 243 # print config
244 244 self.find_url_file()
245 245
246 246 # was the url manually specified?
247 247 keys = set(self.config.EngineFactory.keys())
248 248 keys = keys.union(set(self.config.RegistrationFactory.keys()))
249 249
250 250 if keys.intersection(set(['ip', 'url', 'port'])):
251 251 # Connection info was specified, don't wait for the file
252 252 url_specified = True
253 253 self.wait_for_url_file = 0
254 254 else:
255 255 url_specified = False
256 256
257 257 if self.wait_for_url_file and not os.path.exists(self.url_file):
258 258 self.log.warn("url_file %r not found"%self.url_file)
259 259 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
260 260 tic = time.time()
261 261 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
262 262 # wait for url_file to exist, for up to 10 seconds
263 263 time.sleep(0.1)
264 264
265 265 if os.path.exists(self.url_file):
266 266 self.load_connector_file()
267 267 elif not url_specified:
268 268 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
269 269 self.exit(1)
270 270
271 271
272 272 try:
273 273 exec_lines = config.Kernel.exec_lines
274 274 except AttributeError:
275 275 config.Kernel.exec_lines = []
276 276 exec_lines = config.Kernel.exec_lines
277 277
278 278 if self.startup_script:
279 279 enc = sys.getfilesystemencoding() or 'utf8'
280 280 cmd="execfile(%r)"%self.startup_script.encode(enc)
281 281 exec_lines.append(cmd)
282 282 if self.startup_command:
283 283 exec_lines.append(self.startup_command)
284 284
285 285 # Create the underlying shell class and Engine
286 286 # shell_class = import_item(self.master_config.Global.shell_class)
287 287 # print self.config
288 288 try:
289 289 self.engine = EngineFactory(config=config, log=self.log)
290 290 except:
291 291 self.log.error("Couldn't start the Engine", exc_info=True)
292 292 self.exit(1)
293 293
294 294 def forward_logging(self):
295 295 if self.log_url:
296 296 self.log.info("Forwarding logging to %s"%self.log_url)
297 297 context = self.engine.context
298 298 lsock = context.socket(zmq.PUB)
299 299 lsock.connect(self.log_url)
300 300 self.log.removeHandler(self._log_handler)
301 301 handler = EnginePUBHandler(self.engine, lsock)
302 302 handler.setLevel(self.log_level)
303 303 self.log.addHandler(handler)
304 304 self._log_handler = handler
305 305
306 306 def init_mpi(self):
307 307 global mpi
308 308 self.mpi = MPI(config=self.config)
309 309
310 310 mpi_import_statement = self.mpi.init_script
311 311 if mpi_import_statement:
312 312 try:
313 313 self.log.info("Initializing MPI:")
314 314 self.log.info(mpi_import_statement)
315 315 exec mpi_import_statement in globals()
316 316 except:
317 317 mpi = None
318 318 else:
319 319 mpi = None
320 320
321 321 def initialize(self, argv=None):
322 322 super(IPEngineApp, self).initialize(argv)
323 323 self.init_mpi()
324 324 self.init_engine()
325 325 self.forward_logging()
326 326
327 327 def start(self):
328 328 self.engine.start()
329 329 try:
330 330 self.engine.loop.start()
331 331 except KeyboardInterrupt:
332 332 self.log.critical("Engine Interrupted, shutting down...\n")
333 333
334 334
335 335 def launch_new_instance():
336 336 """Create and run the IPython engine"""
337 337 app = IPEngineApp.instance()
338 338 app.initialize()
339 339 app.start()
340 340
341 341
342 342 if __name__ == '__main__':
343 343 launch_new_instance()
344 344
@@ -1,1169 +1,1176 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import re
26 26 import stat
27 27 import time
28 28
29 29 # signal imports, handling various platforms, versions
30 30
31 31 from signal import SIGINT, SIGTERM
32 32 try:
33 33 from signal import SIGKILL
34 34 except ImportError:
35 35 # Windows
36 36 SIGKILL=SIGTERM
37 37
38 38 try:
39 39 # Windows >= 2.7, 3.2
40 40 from signal import CTRL_C_EVENT as SIGINT
41 41 except ImportError:
42 42 pass
43 43
44 44 from subprocess import Popen, PIPE, STDOUT
45 45 try:
46 46 from subprocess import check_output
47 47 except ImportError:
48 48 # pre-2.7, define check_output with Popen
49 49 def check_output(*args, **kwargs):
50 50 kwargs.update(dict(stdout=PIPE))
51 51 p = Popen(*args, **kwargs)
52 52 out,err = p.communicate()
53 53 return out
54 54
55 55 from zmq.eventloop import ioloop
56 56
57 57 from IPython.config.application import Application
58 58 from IPython.config.configurable import LoggingConfigurable
59 59 from IPython.utils.text import EvalFormatter
60 60 from IPython.utils.traitlets import (
61 61 Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 62 )
63 63 from IPython.utils.path import get_ipython_module_path
64 64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
65 65
66 66 from .win32support import forward_read_events
67 67
68 68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69 69
70 70 WINDOWS = os.name == 'nt'
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Paths to the kernel apps
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 78 'IPython.parallel.apps.ipclusterapp'
79 79 ))
80 80
81 81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 82 'IPython.parallel.apps.ipengineapp'
83 83 ))
84 84
85 85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 86 'IPython.parallel.apps.ipcontrollerapp'
87 87 ))
88 88
89 89 #-----------------------------------------------------------------------------
90 90 # Base launchers and errors
91 91 #-----------------------------------------------------------------------------
92 92
93 93
94 94 class LauncherError(Exception):
95 95 pass
96 96
97 97
98 98 class ProcessStateError(LauncherError):
99 99 pass
100 100
101 101
102 102 class UnknownStatus(LauncherError):
103 103 pass
104 104
105 105
106 106 class BaseLauncher(LoggingConfigurable):
107 107 """An asbtraction for starting, stopping and signaling a process."""
108 108
109 109 # In all of the launchers, the work_dir is where child processes will be
110 110 # run. This will usually be the profile_dir, but may not be. any work_dir
111 111 # passed into the __init__ method will override the config value.
112 112 # This should not be used to set the work_dir for the actual engine
113 113 # and controller. Instead, use their own config files or the
114 114 # controller_args, engine_args attributes of the launchers to add
115 115 # the work_dir option.
116 116 work_dir = Unicode(u'.')
117 117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118 118
119 119 start_data = Any()
120 120 stop_data = Any()
121 121
122 122 def _loop_default(self):
123 123 return ioloop.IOLoop.instance()
124 124
125 125 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 127 self.state = 'before' # can be before, running, after
128 128 self.stop_callbacks = []
129 129 self.start_data = None
130 130 self.stop_data = None
131 131
132 132 @property
133 133 def args(self):
134 134 """A list of cmd and args that will be used to start the process.
135 135
136 136 This is what is passed to :func:`spawnProcess` and the first element
137 137 will be the process name.
138 138 """
139 139 return self.find_args()
140 140
141 141 def find_args(self):
142 142 """The ``.args`` property calls this to find the args list.
143 143
144 144 Subcommand should implement this to construct the cmd and args.
145 145 """
146 146 raise NotImplementedError('find_args must be implemented in a subclass')
147 147
148 148 @property
149 149 def arg_str(self):
150 150 """The string form of the program arguments."""
151 151 return ' '.join(self.args)
152 152
153 153 @property
154 154 def running(self):
155 155 """Am I running."""
156 156 if self.state == 'running':
157 157 return True
158 158 else:
159 159 return False
160 160
161 161 def start(self):
162 162 """Start the process."""
163 163 raise NotImplementedError('start must be implemented in a subclass')
164 164
165 165 def stop(self):
166 166 """Stop the process and notify observers of stopping.
167 167
168 168 This method will return None immediately.
169 169 To observe the actual process stopping, see :meth:`on_stop`.
170 170 """
171 171 raise NotImplementedError('stop must be implemented in a subclass')
172 172
173 173 def on_stop(self, f):
174 174 """Register a callback to be called with this Launcher's stop_data
175 175 when the process actually finishes.
176 176 """
177 177 if self.state=='after':
178 178 return f(self.stop_data)
179 179 else:
180 180 self.stop_callbacks.append(f)
181 181
182 182 def notify_start(self, data):
183 183 """Call this to trigger startup actions.
184 184
185 185 This logs the process startup and sets the state to 'running'. It is
186 186 a pass-through so it can be used as a callback.
187 187 """
188 188
189 189 self.log.info('Process %r started: %r' % (self.args[0], data))
190 190 self.start_data = data
191 191 self.state = 'running'
192 192 return data
193 193
194 194 def notify_stop(self, data):
195 195 """Call this to trigger process stop actions.
196 196
197 197 This logs the process stopping and sets the state to 'after'. Call
198 198 this to trigger callbacks registered via :meth:`on_stop`."""
199 199
200 200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
201 201 self.stop_data = data
202 202 self.state = 'after'
203 203 for i in range(len(self.stop_callbacks)):
204 204 d = self.stop_callbacks.pop()
205 205 d(data)
206 206 return data
207 207
208 208 def signal(self, sig):
209 209 """Signal the process.
210 210
211 211 Parameters
212 212 ----------
213 213 sig : str or int
214 214 'KILL', 'INT', etc., or any signal number
215 215 """
216 216 raise NotImplementedError('signal must be implemented in a subclass')
217 217
218 218 class ClusterAppMixin(HasTraits):
219 219 """MixIn for cluster args as traits"""
220 220 cluster_args = List([])
221 221 profile_dir=Unicode('')
222 222 cluster_id=Unicode('')
223 223 def _profile_dir_changed(self, name, old, new):
224 224 self.cluster_args = []
225 225 if self.profile_dir:
226 226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 227 if self.cluster_id:
228 228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 229 _cluster_id_changed = _profile_dir_changed
230 230
231 231 class ControllerMixin(ClusterAppMixin):
232 232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 233 help="""Popen command to launch ipcontroller.""")
234 234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 236 help="""command-line args to pass to ipcontroller""")
237 237
238 238 class EngineMixin(ClusterAppMixin):
239 239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 240 help="""command to launch the Engine.""")
241 241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 243 help="command-line arguments to pass to ipengine"
244 244 )
245 245
246 246 #-----------------------------------------------------------------------------
247 247 # Local process launchers
248 248 #-----------------------------------------------------------------------------
249 249
250 250
251 251 class LocalProcessLauncher(BaseLauncher):
252 252 """Start and stop an external process in an asynchronous manner.
253 253
254 254 This will launch the external process with a working directory of
255 255 ``self.work_dir``.
256 256 """
257 257
258 258 # This is used to to construct self.args, which is passed to
259 259 # spawnProcess.
260 260 cmd_and_args = List([])
261 261 poll_frequency = Int(100) # in ms
262 262
263 263 def __init__(self, work_dir=u'.', config=None, **kwargs):
264 264 super(LocalProcessLauncher, self).__init__(
265 265 work_dir=work_dir, config=config, **kwargs
266 266 )
267 267 self.process = None
268 268 self.poller = None
269 269
270 270 def find_args(self):
271 271 return self.cmd_and_args
272 272
273 273 def start(self):
274 274 if self.state == 'before':
275 275 self.process = Popen(self.args,
276 276 stdout=PIPE,stderr=PIPE,stdin=PIPE,
277 277 env=os.environ,
278 278 cwd=self.work_dir
279 279 )
280 280 if WINDOWS:
281 281 self.stdout = forward_read_events(self.process.stdout)
282 282 self.stderr = forward_read_events(self.process.stderr)
283 283 else:
284 284 self.stdout = self.process.stdout.fileno()
285 285 self.stderr = self.process.stderr.fileno()
286 286 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
287 287 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
288 288 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
289 289 self.poller.start()
290 290 self.notify_start(self.process.pid)
291 291 else:
292 292 s = 'The process was already started and has state: %r' % self.state
293 293 raise ProcessStateError(s)
294 294
295 295 def stop(self):
296 296 return self.interrupt_then_kill()
297 297
298 298 def signal(self, sig):
299 299 if self.state == 'running':
300 300 if WINDOWS and sig != SIGINT:
301 301 # use Windows tree-kill for better child cleanup
302 302 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
303 303 else:
304 304 self.process.send_signal(sig)
305 305
306 306 def interrupt_then_kill(self, delay=2.0):
307 307 """Send INT, wait a delay and then send KILL."""
308 308 try:
309 309 self.signal(SIGINT)
310 310 except Exception:
311 311 self.log.debug("interrupt failed")
312 312 pass
313 313 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
314 314 self.killer.start()
315 315
316 316 # callbacks, etc:
317 317
318 318 def handle_stdout(self, fd, events):
319 319 if WINDOWS:
320 320 line = self.stdout.recv()
321 321 else:
322 322 line = self.process.stdout.readline()
323 323 # a stopped process will be readable but return empty strings
324 324 if line:
325 325 self.log.info(line[:-1])
326 326 else:
327 327 self.poll()
328 328
329 329 def handle_stderr(self, fd, events):
330 330 if WINDOWS:
331 331 line = self.stderr.recv()
332 332 else:
333 333 line = self.process.stderr.readline()
334 334 # a stopped process will be readable but return empty strings
335 335 if line:
336 336 self.log.error(line[:-1])
337 337 else:
338 338 self.poll()
339 339
340 340 def poll(self):
341 341 status = self.process.poll()
342 342 if status is not None:
343 343 self.poller.stop()
344 344 self.loop.remove_handler(self.stdout)
345 345 self.loop.remove_handler(self.stderr)
346 346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
347 347 return status
348 348
349 349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
350 350 """Launch a controller as a regular external process."""
351 351
352 352 def find_args(self):
353 353 return self.controller_cmd + self.cluster_args + self.controller_args
354 354
355 355 def start(self):
356 356 """Start the controller by profile_dir."""
357 357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
358 358 return super(LocalControllerLauncher, self).start()
359 359
360 360
361 361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
362 362 """Launch a single engine as a regular externall process."""
363 363
364 364 def find_args(self):
365 365 return self.engine_cmd + self.cluster_args + self.engine_args
366 366
367 367
368 368 class LocalEngineSetLauncher(LocalEngineLauncher):
369 369 """Launch a set of engines as regular external processes."""
370 370
371 371 delay = CFloat(0.1, config=True,
372 372 help="""delay (in seconds) between starting each engine after the first.
373 373 This can help force the engines to get their ids in order, or limit
374 374 process flood when starting many engines."""
375 375 )
376 376
377 377 # launcher class
378 378 launcher_class = LocalEngineLauncher
379 379
380 380 launchers = Dict()
381 381 stop_data = Dict()
382 382
383 383 def __init__(self, work_dir=u'.', config=None, **kwargs):
384 384 super(LocalEngineSetLauncher, self).__init__(
385 385 work_dir=work_dir, config=config, **kwargs
386 386 )
387 387 self.stop_data = {}
388 388
389 389 def start(self, n):
390 390 """Start n engines by profile or profile_dir."""
391 391 dlist = []
392 392 for i in range(n):
393 393 if i > 0:
394 394 time.sleep(self.delay)
395 395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 397 )
398 398
399 399 # Copy the engine args over to each engine launcher.
400 400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
401 401 el.engine_args = copy.deepcopy(self.engine_args)
402 402 el.on_stop(self._notice_engine_stopped)
403 403 d = el.start()
404 404 if i==0:
405 405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
406 406 self.launchers[i] = el
407 407 dlist.append(d)
408 408 self.notify_start(dlist)
409 409 return dlist
410 410
411 411 def find_args(self):
412 412 return ['engine set']
413 413
414 414 def signal(self, sig):
415 415 dlist = []
416 416 for el in self.launchers.itervalues():
417 417 d = el.signal(sig)
418 418 dlist.append(d)
419 419 return dlist
420 420
421 421 def interrupt_then_kill(self, delay=1.0):
422 422 dlist = []
423 423 for el in self.launchers.itervalues():
424 424 d = el.interrupt_then_kill(delay)
425 425 dlist.append(d)
426 426 return dlist
427 427
428 428 def stop(self):
429 429 return self.interrupt_then_kill()
430 430
431 431 def _notice_engine_stopped(self, data):
432 432 pid = data['pid']
433 433 for idx,el in self.launchers.iteritems():
434 434 if el.process.pid == pid:
435 435 break
436 436 self.launchers.pop(idx)
437 437 self.stop_data[idx] = data
438 438 if not self.launchers:
439 439 self.notify_stop(self.stop_data)
440 440
441 441
442 442 #-----------------------------------------------------------------------------
443 443 # MPIExec launchers
444 444 #-----------------------------------------------------------------------------
445 445
446 446
447 447 class MPIExecLauncher(LocalProcessLauncher):
448 448 """Launch an external process using mpiexec."""
449 449
450 450 mpi_cmd = List(['mpiexec'], config=True,
451 451 help="The mpiexec command to use in starting the process."
452 452 )
453 453 mpi_args = List([], config=True,
454 454 help="The command line arguments to pass to mpiexec."
455 455 )
456 456 program = List(['date'],
457 457 help="The program to start via mpiexec.")
458 458 program_args = List([],
459 459 help="The command line argument to the program."
460 460 )
461 461 n = Int(1)
462 462
463 463 def find_args(self):
464 464 """Build self.args using all the fields."""
465 465 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
466 466 self.program + self.program_args
467 467
468 468 def start(self, n):
469 469 """Start n instances of the program using mpiexec."""
470 470 self.n = n
471 471 return super(MPIExecLauncher, self).start()
472 472
473 473
474 474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
475 475 """Launch a controller using mpiexec."""
476 476
477 477 # alias back to *non-configurable* program[_args] for use in find_args()
478 478 # this way all Controller/EngineSetLaunchers have the same form, rather
479 479 # than *some* having `program_args` and others `controller_args`
480 480 @property
481 481 def program(self):
482 482 return self.controller_cmd
483 483
484 484 @property
485 485 def program_args(self):
486 486 return self.cluster_args + self.controller_args
487 487
488 488 def start(self):
489 489 """Start the controller by profile_dir."""
490 490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
491 491 return super(MPIExecControllerLauncher, self).start(1)
492 492
493 493
494 494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 495 """Launch engines using mpiexec"""
496 496
497 497 # alias back to *non-configurable* program[_args] for use in find_args()
498 498 # this way all Controller/EngineSetLaunchers have the same form, rather
499 499 # than *some* having `program_args` and others `controller_args`
500 500 @property
501 501 def program(self):
502 502 return self.engine_cmd
503 503
504 504 @property
505 505 def program_args(self):
506 506 return self.cluster_args + self.engine_args
507 507
508 508 def start(self, n):
509 509 """Start n engines by profile or profile_dir."""
510 510 self.n = n
511 511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 512 return super(MPIExecEngineSetLauncher, self).start(n)
513 513
514 514 #-----------------------------------------------------------------------------
515 515 # SSH launchers
516 516 #-----------------------------------------------------------------------------
517 517
518 518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
519 519
520 520 class SSHLauncher(LocalProcessLauncher):
521 521 """A minimal launcher for ssh.
522 522
523 523 To be useful this will probably have to be extended to use the ``sshx``
524 524 idea for environment variables. There could be other things this needs
525 525 as well.
526 526 """
527 527
528 528 ssh_cmd = List(['ssh'], config=True,
529 529 help="command for starting ssh")
530 530 ssh_args = List(['-tt'], config=True,
531 531 help="args to pass to ssh")
532 532 program = List(['date'],
533 533 help="Program to launch via ssh")
534 534 program_args = List([],
535 535 help="args to pass to remote program")
536 536 hostname = Unicode('', config=True,
537 537 help="hostname on which to launch the program")
538 538 user = Unicode('', config=True,
539 539 help="username for ssh")
540 540 location = Unicode('', config=True,
541 541 help="user@hostname location for ssh in one setting")
542 542
543 543 def _hostname_changed(self, name, old, new):
544 544 if self.user:
545 545 self.location = u'%s@%s' % (self.user, new)
546 546 else:
547 547 self.location = new
548 548
549 549 def _user_changed(self, name, old, new):
550 550 self.location = u'%s@%s' % (new, self.hostname)
551 551
552 552 def find_args(self):
553 553 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 554 self.program + self.program_args
555 555
556 556 def start(self, hostname=None, user=None):
557 557 if hostname is not None:
558 558 self.hostname = hostname
559 559 if user is not None:
560 560 self.user = user
561 561
562 562 return super(SSHLauncher, self).start()
563 563
564 564 def signal(self, sig):
565 565 if self.state == 'running':
566 566 # send escaped ssh connection-closer
567 567 self.process.stdin.write('~.')
568 568 self.process.stdin.flush()
569 569
570 570
571 571
572 572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
573 573
574 574 # alias back to *non-configurable* program[_args] for use in find_args()
575 575 # this way all Controller/EngineSetLaunchers have the same form, rather
576 576 # than *some* having `program_args` and others `controller_args`
577 577 @property
578 578 def program(self):
579 579 return self.controller_cmd
580 580
581 581 @property
582 582 def program_args(self):
583 583 return self.cluster_args + self.controller_args
584 584
585 585
586 586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
587 587
588 588 # alias back to *non-configurable* program[_args] for use in find_args()
589 589 # this way all Controller/EngineSetLaunchers have the same form, rather
590 590 # than *some* having `program_args` and others `controller_args`
591 591 @property
592 592 def program(self):
593 593 return self.engine_cmd
594 594
595 595 @property
596 596 def program_args(self):
597 597 return self.cluster_args + self.engine_args
598 598
599 599
600 600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
601 601 launcher_class = SSHEngineLauncher
602 602 engines = Dict(config=True,
603 603 help="""dict of engines to launch. This is a dict by hostname of ints,
604 604 corresponding to the number of engines to start on that host.""")
605 605
606 606 def start(self, n):
607 607 """Start engines by profile or profile_dir.
608 608 `n` is ignored, and the `engines` config property is used instead.
609 609 """
610 610
611 611 dlist = []
612 612 for host, n in self.engines.iteritems():
613 613 if isinstance(n, (tuple, list)):
614 614 n, args = n
615 615 else:
616 616 args = copy.deepcopy(self.engine_args)
617 617
618 618 if '@' in host:
619 619 user,host = host.split('@',1)
620 620 else:
621 621 user=None
622 622 for i in range(n):
623 623 if i > 0:
624 624 time.sleep(self.delay)
625 625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 627 )
628 628
629 629 # Copy the engine args over to each engine launcher.
630 630 el.engine_cmd = self.engine_cmd
631 631 el.engine_args = args
632 632 el.on_stop(self._notice_engine_stopped)
633 633 d = el.start(user=user, hostname=host)
634 634 if i==0:
635 635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
636 636 self.launchers[host+str(i)] = el
637 637 dlist.append(d)
638 638 self.notify_start(dlist)
639 639 return dlist
640 640
641 641
642 642
643 643 #-----------------------------------------------------------------------------
644 644 # Windows HPC Server 2008 scheduler launchers
645 645 #-----------------------------------------------------------------------------
646 646
647 647
648 648 # This is only used on Windows.
649 649 def find_job_cmd():
650 650 if WINDOWS:
651 651 try:
652 652 return find_cmd('job')
653 653 except (FindCmdError, ImportError):
654 654 # ImportError will be raised if win32api is not installed
655 655 return 'job'
656 656 else:
657 657 return 'job'
658 658
659 659
660 660 class WindowsHPCLauncher(BaseLauncher):
661 661
662 662 job_id_regexp = Unicode(r'\d+', config=True,
663 663 help="""A regular expression used to get the job id from the output of the
664 664 submit_command. """
665 665 )
666 666 job_file_name = Unicode(u'ipython_job.xml', config=True,
667 667 help="The filename of the instantiated job script.")
668 668 # The full path to the instantiated job script. This gets made dynamically
669 669 # by combining the work_dir with the job_file_name.
670 670 job_file = Unicode(u'')
671 671 scheduler = Unicode('', config=True,
672 672 help="The hostname of the scheduler to submit the job to.")
673 673 job_cmd = Unicode(find_job_cmd(), config=True,
674 674 help="The command for submitting jobs.")
675 675
676 676 def __init__(self, work_dir=u'.', config=None, **kwargs):
677 677 super(WindowsHPCLauncher, self).__init__(
678 678 work_dir=work_dir, config=config, **kwargs
679 679 )
680 680
681 681 @property
682 682 def job_file(self):
683 683 return os.path.join(self.work_dir, self.job_file_name)
684 684
685 685 def write_job_file(self, n):
686 686 raise NotImplementedError("Implement write_job_file in a subclass.")
687 687
688 688 def find_args(self):
689 689 return [u'job.exe']
690 690
691 691 def parse_job_id(self, output):
692 692 """Take the output of the submit command and return the job id."""
693 693 m = re.search(self.job_id_regexp, output)
694 694 if m is not None:
695 695 job_id = m.group()
696 696 else:
697 697 raise LauncherError("Job id couldn't be determined: %s" % output)
698 698 self.job_id = job_id
699 699 self.log.info('Job started with job id: %r' % job_id)
700 700 return job_id
701 701
702 702 def start(self, n):
703 703 """Start n copies of the process using the Win HPC job scheduler."""
704 704 self.write_job_file(n)
705 705 args = [
706 706 'submit',
707 707 '/jobfile:%s' % self.job_file,
708 708 '/scheduler:%s' % self.scheduler
709 709 ]
710 710 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
711 711
712 712 output = check_output([self.job_cmd]+args,
713 713 env=os.environ,
714 714 cwd=self.work_dir,
715 715 stderr=STDOUT
716 716 )
717 717 job_id = self.parse_job_id(output)
718 718 self.notify_start(job_id)
719 719 return job_id
720 720
721 721 def stop(self):
722 722 args = [
723 723 'cancel',
724 724 self.job_id,
725 725 '/scheduler:%s' % self.scheduler
726 726 ]
727 727 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
728 728 try:
729 729 output = check_output([self.job_cmd]+args,
730 730 env=os.environ,
731 731 cwd=self.work_dir,
732 732 stderr=STDOUT
733 733 )
734 734 except:
735 735 output = 'The job already appears to be stoppped: %r' % self.job_id
736 736 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
737 737 return output
738 738
739 739
740 740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
741 741
742 742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
743 743 help="WinHPC xml job file.")
744 744 controller_args = List([], config=False,
745 745 help="extra args to pass to ipcontroller")
746 746
747 747 def write_job_file(self, n):
748 748 job = IPControllerJob(config=self.config)
749 749
750 750 t = IPControllerTask(config=self.config)
751 751 # The tasks work directory is *not* the actual work directory of
752 752 # the controller. It is used as the base path for the stdout/stderr
753 753 # files that the scheduler redirects to.
754 754 t.work_directory = self.profile_dir
755 755 # Add the profile_dir and from self.start().
756 756 t.controller_args.extend(self.cluster_args)
757 757 t.controller_args.extend(self.controller_args)
758 758 job.add_task(t)
759 759
760 760 self.log.info("Writing job description file: %s" % self.job_file)
761 761 job.write(self.job_file)
762 762
763 763 @property
764 764 def job_file(self):
765 765 return os.path.join(self.profile_dir, self.job_file_name)
766 766
767 767 def start(self):
768 768 """Start the controller by profile_dir."""
769 769 return super(WindowsHPCControllerLauncher, self).start(1)
770 770
771 771
772 772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
773 773
774 774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
775 775 help="jobfile for ipengines job")
776 776 engine_args = List([], config=False,
777 777 help="extra args to pas to ipengine")
778 778
779 779 def write_job_file(self, n):
780 780 job = IPEngineSetJob(config=self.config)
781 781
782 782 for i in range(n):
783 783 t = IPEngineTask(config=self.config)
784 784 # The tasks work directory is *not* the actual work directory of
785 785 # the engine. It is used as the base path for the stdout/stderr
786 786 # files that the scheduler redirects to.
787 787 t.work_directory = self.profile_dir
788 788 # Add the profile_dir and from self.start().
789 789 t.controller_args.extend(self.cluster_args)
790 790 t.controller_args.extend(self.engine_args)
791 791 job.add_task(t)
792 792
793 793 self.log.info("Writing job description file: %s" % self.job_file)
794 794 job.write(self.job_file)
795 795
796 796 @property
797 797 def job_file(self):
798 798 return os.path.join(self.profile_dir, self.job_file_name)
799 799
800 800 def start(self, n):
801 801 """Start the controller by profile_dir."""
802 802 return super(WindowsHPCEngineSetLauncher, self).start(n)
803 803
804 804
805 805 #-----------------------------------------------------------------------------
806 806 # Batch (PBS) system launchers
807 807 #-----------------------------------------------------------------------------
808 808
809 809 class BatchClusterAppMixin(ClusterAppMixin):
810 """ClusterApp mixin that updates context dict, rather than args"""
811 context = Dict({'profile_dir':'', 'cluster_id':''})
810 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
812 811 def _profile_dir_changed(self, name, old, new):
813 812 self.context[name] = new
814 813 _cluster_id_changed = _profile_dir_changed
815 814
815 def _profile_dir_default(self):
816 self.context['profile_dir'] = ''
817 return ''
818 def _cluster_id_default(self):
819 self.context['cluster_id'] = ''
820 return ''
821
822
816 823 class BatchSystemLauncher(BaseLauncher):
817 824 """Launch an external process using a batch system.
818 825
819 826 This class is designed to work with UNIX batch systems like PBS, LSF,
820 827 GridEngine, etc. The overall model is that there are different commands
821 828 like qsub, qdel, etc. that handle the starting and stopping of the process.
822 829
823 830 This class also has the notion of a batch script. The ``batch_template``
824 831 attribute can be set to a string that is a template for the batch script.
825 832 This template is instantiated using string formatting. Thus the template can
826 833 use {n} fot the number of instances. Subclasses can add additional variables
827 834 to the template dict.
828 835 """
829 836
830 837 # Subclasses must fill these in. See PBSEngineSet
831 838 submit_command = List([''], config=True,
832 839 help="The name of the command line program used to submit jobs.")
833 840 delete_command = List([''], config=True,
834 841 help="The name of the command line program used to delete jobs.")
835 842 job_id_regexp = Unicode('', config=True,
836 843 help="""A regular expression used to get the job id from the output of the
837 844 submit_command.""")
838 845 batch_template = Unicode('', config=True,
839 846 help="The string that is the batch script template itself.")
840 847 batch_template_file = Unicode(u'', config=True,
841 848 help="The file that contains the batch template.")
842 849 batch_file_name = Unicode(u'batch_script', config=True,
843 850 help="The filename of the instantiated batch script.")
844 851 queue = Unicode(u'', config=True,
845 852 help="The PBS Queue.")
846 853
847 854 def _queue_changed(self, name, old, new):
848 855 self.context[name] = new
849 856
850 857 n = Int(1)
851 858 _n_changed = _queue_changed
852 859
853 860 # not configurable, override in subclasses
854 861 # PBS Job Array regex
855 862 job_array_regexp = Unicode('')
856 863 job_array_template = Unicode('')
857 864 # PBS Queue regex
858 865 queue_regexp = Unicode('')
859 866 queue_template = Unicode('')
860 867 # The default batch template, override in subclasses
861 868 default_template = Unicode('')
862 869 # The full path to the instantiated batch script.
863 870 batch_file = Unicode(u'')
864 871 # the format dict used with batch_template:
865 872 context = Dict()
866 873 # the Formatter instance for rendering the templates:
867 874 formatter = Instance(EvalFormatter, (), {})
868 875
869 876
870 877 def find_args(self):
871 878 return self.submit_command + [self.batch_file]
872 879
873 880 def __init__(self, work_dir=u'.', config=None, **kwargs):
874 881 super(BatchSystemLauncher, self).__init__(
875 882 work_dir=work_dir, config=config, **kwargs
876 883 )
877 884 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
878 885
879 886 def parse_job_id(self, output):
880 887 """Take the output of the submit command and return the job id."""
881 888 m = re.search(self.job_id_regexp, output)
882 889 if m is not None:
883 890 job_id = m.group()
884 891 else:
885 892 raise LauncherError("Job id couldn't be determined: %s" % output)
886 893 self.job_id = job_id
887 894 self.log.info('Job submitted with job id: %r' % job_id)
888 895 return job_id
889 896
890 897 def write_batch_script(self, n):
891 898 """Instantiate and write the batch script to the work_dir."""
892 899 self.n = n
893 900 # first priority is batch_template if set
894 901 if self.batch_template_file and not self.batch_template:
895 902 # second priority is batch_template_file
896 903 with open(self.batch_template_file) as f:
897 904 self.batch_template = f.read()
898 905 if not self.batch_template:
899 906 # third (last) priority is default_template
900 907 self.batch_template = self.default_template
901 908
902 909 # add jobarray or queue lines to user-specified template
903 910 # note that this is *only* when user did not specify a template.
904 911 regex = re.compile(self.job_array_regexp)
905 912 # print regex.search(self.batch_template)
906 913 if not regex.search(self.batch_template):
907 914 self.log.info("adding job array settings to batch script")
908 915 firstline, rest = self.batch_template.split('\n',1)
909 916 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
910 917
911 918 regex = re.compile(self.queue_regexp)
912 919 # print regex.search(self.batch_template)
913 920 if self.queue and not regex.search(self.batch_template):
914 921 self.log.info("adding PBS queue settings to batch script")
915 922 firstline, rest = self.batch_template.split('\n',1)
916 923 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
917 924
918 925 script_as_string = self.formatter.format(self.batch_template, **self.context)
919 926 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
920 927
921 928 with open(self.batch_file, 'w') as f:
922 929 f.write(script_as_string)
923 930 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
924 931
925 932 def start(self, n):
926 933 """Start n copies of the process using a batch system."""
927 934 # Here we save profile_dir in the context so they
928 935 # can be used in the batch script template as {profile_dir}
929 936 self.write_batch_script(n)
930 937 output = check_output(self.args, env=os.environ)
931 938
932 939 job_id = self.parse_job_id(output)
933 940 self.notify_start(job_id)
934 941 return job_id
935 942
936 943 def stop(self):
937 944 output = check_output(self.delete_command+[self.job_id], env=os.environ)
938 945 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
939 946 return output
940 947
941 948
942 949 class PBSLauncher(BatchSystemLauncher):
943 950 """A BatchSystemLauncher subclass for PBS."""
944 951
945 952 submit_command = List(['qsub'], config=True,
946 953 help="The PBS submit command ['qsub']")
947 954 delete_command = List(['qdel'], config=True,
948 955 help="The PBS delete command ['qsub']")
949 956 job_id_regexp = Unicode(r'\d+', config=True,
950 957 help="Regular expresion for identifying the job ID [r'\d+']")
951 958
952 959 batch_file = Unicode(u'')
953 960 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
954 961 job_array_template = Unicode('#PBS -t 1-{n}')
955 962 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
956 963 queue_template = Unicode('#PBS -q {queue}')
957 964
958 965
959 class PBSControllerLauncher(BatchClusterAppMixin, PBSLauncher):
966 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
960 967 """Launch a controller using PBS."""
961 968
962 969 batch_file_name = Unicode(u'pbs_controller', config=True,
963 970 help="batch file name for the controller job.")
964 971 default_template= Unicode("""#!/bin/sh
965 972 #PBS -V
966 973 #PBS -N ipcontroller
967 974 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
968 975 """%(' '.join(ipcontroller_cmd_argv)))
969 976
970 977
971 978 def start(self):
972 979 """Start the controller by profile or profile_dir."""
973 980 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
974 981 return super(PBSControllerLauncher, self).start(1)
975 982
976 983
977 class PBSEngineSetLauncher(BatchClusterAppMixin, PBSLauncher):
984 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
978 985 """Launch Engines using PBS"""
979 986 batch_file_name = Unicode(u'pbs_engines', config=True,
980 987 help="batch file name for the engine(s) job.")
981 988 default_template= Unicode(u"""#!/bin/sh
982 989 #PBS -V
983 990 #PBS -N ipengine
984 991 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
985 992 """%(' '.join(ipengine_cmd_argv)))
986 993
987 994 def start(self, n):
988 995 """Start n engines by profile or profile_dir."""
989 996 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
990 997 return super(PBSEngineSetLauncher, self).start(n)
991 998
992 999 #SGE is very similar to PBS
993 1000
994 1001 class SGELauncher(PBSLauncher):
995 1002 """Sun GridEngine is a PBS clone with slightly different syntax"""
996 1003 job_array_regexp = Unicode('#\$\W+\-t')
997 1004 job_array_template = Unicode('#$ -t 1-{n}')
998 1005 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
999 1006 queue_template = Unicode('#$ -q {queue}')
1000 1007
1001 class SGEControllerLauncher(BatchClusterAppMixin, SGELauncher):
1008 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1002 1009 """Launch a controller using SGE."""
1003 1010
1004 1011 batch_file_name = Unicode(u'sge_controller', config=True,
1005 1012 help="batch file name for the ipontroller job.")
1006 1013 default_template= Unicode(u"""#$ -V
1007 1014 #$ -S /bin/sh
1008 1015 #$ -N ipcontroller
1009 1016 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1010 1017 """%(' '.join(ipcontroller_cmd_argv)))
1011 1018
1012 1019 def start(self):
1013 1020 """Start the controller by profile or profile_dir."""
1014 1021 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
1015 1022 return super(SGEControllerLauncher, self).start(1)
1016 1023
1017 class SGEEngineSetLauncher(BatchClusterAppMixin, SGELauncher):
1024 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1018 1025 """Launch Engines with SGE"""
1019 1026 batch_file_name = Unicode(u'sge_engines', config=True,
1020 1027 help="batch file name for the engine(s) job.")
1021 1028 default_template = Unicode("""#$ -V
1022 1029 #$ -S /bin/sh
1023 1030 #$ -N ipengine
1024 1031 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1025 1032 """%(' '.join(ipengine_cmd_argv)))
1026 1033
1027 1034 def start(self, n):
1028 1035 """Start n engines by profile or profile_dir."""
1029 1036 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1030 1037 return super(SGEEngineSetLauncher, self).start(n)
1031 1038
1032 1039
1033 1040 # LSF launchers
1034 1041
1035 1042 class LSFLauncher(BatchSystemLauncher):
1036 1043 """A BatchSystemLauncher subclass for LSF."""
1037 1044
1038 1045 submit_command = List(['bsub'], config=True,
1039 1046 help="The PBS submit command ['bsub']")
1040 1047 delete_command = List(['bkill'], config=True,
1041 1048 help="The PBS delete command ['bkill']")
1042 1049 job_id_regexp = Unicode(r'\d+', config=True,
1043 1050 help="Regular expresion for identifying the job ID [r'\d+']")
1044 1051
1045 1052 batch_file = Unicode(u'')
1046 1053 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1047 1054 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1048 1055 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1049 1056 queue_template = Unicode('#BSUB -q {queue}')
1050 1057
1051 1058 def start(self, n):
1052 1059 """Start n copies of the process using LSF batch system.
1053 1060 This cant inherit from the base class because bsub expects
1054 1061 to be piped a shell script in order to honor the #BSUB directives :
1055 1062 bsub < script
1056 1063 """
1057 1064 # Here we save profile_dir in the context so they
1058 1065 # can be used in the batch script template as {profile_dir}
1059 1066 self.write_batch_script(n)
1060 1067 #output = check_output(self.args, env=os.environ)
1061 1068 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1062 1069 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1063 1070 output,err = p.communicate()
1064 1071 job_id = self.parse_job_id(output)
1065 1072 self.notify_start(job_id)
1066 1073 return job_id
1067 1074
1068 1075
1069 class LSFControllerLauncher(BatchClusterAppMixin, LSFLauncher):
1076 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1070 1077 """Launch a controller using LSF."""
1071 1078
1072 1079 batch_file_name = Unicode(u'lsf_controller', config=True,
1073 1080 help="batch file name for the controller job.")
1074 1081 default_template= Unicode("""#!/bin/sh
1075 1082 #BSUB -J ipcontroller
1076 1083 #BSUB -oo ipcontroller.o.%%J
1077 1084 #BSUB -eo ipcontroller.e.%%J
1078 1085 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1079 1086 """%(' '.join(ipcontroller_cmd_argv)))
1080 1087
1081 1088 def start(self):
1082 1089 """Start the controller by profile or profile_dir."""
1083 1090 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1084 1091 return super(LSFControllerLauncher, self).start(1)
1085 1092
1086 1093
1087 class LSFEngineSetLauncher(BatchClusterAppMixin, LSFLauncher):
1094 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1088 1095 """Launch Engines using LSF"""
1089 1096 batch_file_name = Unicode(u'lsf_engines', config=True,
1090 1097 help="batch file name for the engine(s) job.")
1091 1098 default_template= Unicode(u"""#!/bin/sh
1092 1099 #BSUB -oo ipengine.o.%%J
1093 1100 #BSUB -eo ipengine.e.%%J
1094 1101 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1095 1102 """%(' '.join(ipengine_cmd_argv)))
1096 1103
1097 1104 def start(self, n):
1098 1105 """Start n engines by profile or profile_dir."""
1099 1106 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1100 1107 return super(LSFEngineSetLauncher, self).start(n)
1101 1108
1102 1109
1103 1110 #-----------------------------------------------------------------------------
1104 1111 # A launcher for ipcluster itself!
1105 1112 #-----------------------------------------------------------------------------
1106 1113
1107 1114
1108 1115 class IPClusterLauncher(LocalProcessLauncher):
1109 1116 """Launch the ipcluster program in an external process."""
1110 1117
1111 1118 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1112 1119 help="Popen command for ipcluster")
1113 1120 ipcluster_args = List(
1114 1121 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1115 1122 help="Command line arguments to pass to ipcluster.")
1116 1123 ipcluster_subcommand = Unicode('start')
1117 1124 ipcluster_n = Int(2)
1118 1125
1119 1126 def find_args(self):
1120 1127 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1121 1128 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1122 1129
1123 1130 def start(self):
1124 1131 self.log.info("Starting ipcluster: %r" % self.args)
1125 1132 return super(IPClusterLauncher, self).start()
1126 1133
1127 1134 #-----------------------------------------------------------------------------
1128 1135 # Collections of launchers
1129 1136 #-----------------------------------------------------------------------------
1130 1137
1131 1138 local_launchers = [
1132 1139 LocalControllerLauncher,
1133 1140 LocalEngineLauncher,
1134 1141 LocalEngineSetLauncher,
1135 1142 ]
1136 1143 mpi_launchers = [
1137 1144 MPIExecLauncher,
1138 1145 MPIExecControllerLauncher,
1139 1146 MPIExecEngineSetLauncher,
1140 1147 ]
1141 1148 ssh_launchers = [
1142 1149 SSHLauncher,
1143 1150 SSHControllerLauncher,
1144 1151 SSHEngineLauncher,
1145 1152 SSHEngineSetLauncher,
1146 1153 ]
1147 1154 winhpc_launchers = [
1148 1155 WindowsHPCLauncher,
1149 1156 WindowsHPCControllerLauncher,
1150 1157 WindowsHPCEngineSetLauncher,
1151 1158 ]
1152 1159 pbs_launchers = [
1153 1160 PBSLauncher,
1154 1161 PBSControllerLauncher,
1155 1162 PBSEngineSetLauncher,
1156 1163 ]
1157 1164 sge_launchers = [
1158 1165 SGELauncher,
1159 1166 SGEControllerLauncher,
1160 1167 SGEEngineSetLauncher,
1161 1168 ]
1162 1169 lsf_launchers = [
1163 1170 LSFLauncher,
1164 1171 LSFControllerLauncher,
1165 1172 LSFEngineSetLauncher,
1166 1173 ]
1167 1174 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1168 1175 + pbs_launchers + sge_launchers + lsf_launchers
1169 1176
General Comments 0
You need to be logged in to leave comments. Login now