##// END OF EJS Templates
add cluster_id to parallel apps...
MinRK -
Show More
@@ -1,244 +1,257 b''
1 1 # encoding: utf-8
2 2 """
3 3 The Base Application class for IPython.parallel apps
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * Min RK
9 9
10 10 """
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Copyright (C) 2008-2011 The IPython Development Team
14 14 #
15 15 # Distributed under the terms of the BSD License. The full license is in
16 16 # the file COPYING, distributed as part of this software.
17 17 #-----------------------------------------------------------------------------
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 from __future__ import with_statement
24 24
25 25 import os
26 26 import logging
27 27 import re
28 28 import sys
29 29
30 30 from subprocess import Popen, PIPE
31 31
32 32 from IPython.core import release
33 33 from IPython.core.crashhandler import CrashHandler
34 34 from IPython.core.application import (
35 35 BaseIPythonApplication,
36 36 base_aliases as base_ip_aliases,
37 37 base_flags as base_ip_flags
38 38 )
39 39 from IPython.utils.path import expand_path
40 40
41 41 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Module errors
45 45 #-----------------------------------------------------------------------------
46 46
47 47 class PIDFileError(Exception):
48 48 pass
49 49
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Crash handler for this application
53 53 #-----------------------------------------------------------------------------
54 54
55 55 class ParallelCrashHandler(CrashHandler):
56 56 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
57 57
58 58 def __init__(self, app):
59 59 contact_name = release.authors['Min'][0]
60 60 contact_email = release.authors['Min'][1]
61 61 bug_tracker = 'http://github.com/ipython/ipython/issues'
62 62 super(ParallelCrashHandler,self).__init__(
63 63 app, contact_name, contact_email, bug_tracker
64 64 )
65 65
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # Main application
69 69 #-----------------------------------------------------------------------------
70 70 base_aliases = {}
71 71 base_aliases.update(base_ip_aliases)
72 72 base_aliases.update({
73 73 'profile-dir' : 'ProfileDir.location',
74 74 'work-dir' : 'BaseParallelApplication.work_dir',
75 75 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 76 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 77 'log-url' : 'BaseParallelApplication.log_url',
78 'cluster-id' : 'BaseParallelApplication.cluster_id',
78 79 })
79 80
80 81 base_flags = {
81 82 'log-to-file' : (
82 83 {'BaseParallelApplication' : {'log_to_file' : True}},
83 84 "send log output to a file"
84 85 )
85 86 }
86 87 base_flags.update(base_ip_flags)
87 88
88 89 class BaseParallelApplication(BaseIPythonApplication):
89 90 """The base Application for IPython.parallel apps
90 91
91 92 Principle extensions to BaseIPyythonApplication:
92 93
93 94 * work_dir
94 95 * remote logging via pyzmq
95 96 * IOLoop instance
96 97 """
97 98
98 99 crash_handler_class = ParallelCrashHandler
99 100
100 101 def _log_level_default(self):
101 102 # temporarily override default_log_level to INFO
102 103 return logging.INFO
103 104
104 105 work_dir = Unicode(os.getcwdu(), config=True,
105 106 help='Set the working dir for the process.'
106 107 )
107 108 def _work_dir_changed(self, name, old, new):
108 109 self.work_dir = unicode(expand_path(new))
109 110
110 111 log_to_file = Bool(config=True,
111 112 help="whether to log to a file")
112 113
113 114 clean_logs = Bool(False, config=True,
114 115 help="whether to cleanup old logfiles before starting")
115 116
116 117 log_url = Unicode('', config=True,
117 118 help="The ZMQ URL of the iplogger to aggregate logging.")
118 119
120 cluster_id = Unicode('', config=True,
121 help="""String id to add to runtime files, to prevent name collisions when
122 using multiple clusters with a single profile.
123
124 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
125 """
126 )
127 def _cluster_id_changed(self, name, old, new):
128 self.name = self.__class__.name
129 if new:
130 self.name += '-%s'%new
131
119 132 def _config_files_default(self):
120 133 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
121 134
122 135 loop = Instance('zmq.eventloop.ioloop.IOLoop')
123 136 def _loop_default(self):
124 137 from zmq.eventloop.ioloop import IOLoop
125 138 return IOLoop.instance()
126 139
127 140 aliases = Dict(base_aliases)
128 141 flags = Dict(base_flags)
129 142
130 143 def initialize(self, argv=None):
131 144 """initialize the app"""
132 145 super(BaseParallelApplication, self).initialize(argv)
133 146 self.to_work_dir()
134 147 self.reinit_logging()
135 148
136 149 def to_work_dir(self):
137 150 wd = self.work_dir
138 151 if unicode(wd) != os.getcwdu():
139 152 os.chdir(wd)
140 153 self.log.info("Changing to working dir: %s" % wd)
141 154 # This is the working dir by now.
142 155 sys.path.insert(0, '')
143 156
144 157 def reinit_logging(self):
145 158 # Remove old log files
146 159 log_dir = self.profile_dir.log_dir
147 160 if self.clean_logs:
148 161 for f in os.listdir(log_dir):
149 162 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
150 163 os.remove(os.path.join(log_dir, f))
151 164 if self.log_to_file:
152 165 # Start logging to the new log file
153 166 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
154 167 logfile = os.path.join(log_dir, log_filename)
155 168 open_log_file = open(logfile, 'w')
156 169 else:
157 170 open_log_file = None
158 171 if open_log_file is not None:
159 172 self.log.removeHandler(self._log_handler)
160 173 self._log_handler = logging.StreamHandler(open_log_file)
161 174 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
162 175 self._log_handler.setFormatter(self._log_formatter)
163 176 self.log.addHandler(self._log_handler)
164 177 # do not propagate log messages to root logger
165 178 # ipcluster app will sometimes print duplicate messages during shutdown
166 179 # if this is 1 (default):
167 180 self.log.propagate = False
168 181
169 182 def write_pid_file(self, overwrite=False):
170 183 """Create a .pid file in the pid_dir with my pid.
171 184
172 185 This must be called after pre_construct, which sets `self.pid_dir`.
173 186 This raises :exc:`PIDFileError` if the pid file exists already.
174 187 """
175 188 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
176 189 if os.path.isfile(pid_file):
177 190 pid = self.get_pid_from_file()
178 191 if not overwrite:
179 192 raise PIDFileError(
180 193 'The pid file [%s] already exists. \nThis could mean that this '
181 194 'server is already running with [pid=%s].' % (pid_file, pid)
182 195 )
183 196 with open(pid_file, 'w') as f:
184 197 self.log.info("Creating pid file: %s" % pid_file)
185 198 f.write(repr(os.getpid())+'\n')
186 199
187 200 def remove_pid_file(self):
188 201 """Remove the pid file.
189 202
190 203 This should be called at shutdown by registering a callback with
191 204 :func:`reactor.addSystemEventTrigger`. This needs to return
192 205 ``None``.
193 206 """
194 207 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
195 208 if os.path.isfile(pid_file):
196 209 try:
197 210 self.log.info("Removing pid file: %s" % pid_file)
198 211 os.remove(pid_file)
199 212 except:
200 213 self.log.warn("Error removing the pid file: %s" % pid_file)
201 214
202 215 def get_pid_from_file(self):
203 216 """Get the pid from the pid file.
204 217
205 218 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
206 219 """
207 220 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
208 221 if os.path.isfile(pid_file):
209 222 with open(pid_file, 'r') as f:
210 223 s = f.read().strip()
211 224 try:
212 225 pid = int(s)
213 226 except:
214 227 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
215 228 return pid
216 229 else:
217 230 raise PIDFileError('pid file not found: %s' % pid_file)
218 231
219 232 def check_pid(self, pid):
220 233 if os.name == 'nt':
221 234 try:
222 235 import ctypes
223 236 # returns 0 if no such process (of ours) exists
224 237 # positive int otherwise
225 238 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
226 239 except Exception:
227 240 self.log.warn(
228 241 "Could not determine whether pid %i is running via `OpenProcess`. "
229 242 " Making the likely assumption that it is."%pid
230 243 )
231 244 return True
232 245 return bool(p)
233 246 else:
234 247 try:
235 248 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
236 249 output,_ = p.communicate()
237 250 except OSError:
238 251 self.log.warn(
239 252 "Could not determine whether pid %i is running via `ps x`. "
240 253 " Making the likely assumption that it is."%pid
241 254 )
242 255 return True
243 256 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
244 257 return pid in pids
@@ -1,441 +1,452 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import os
27 27 import socket
28 28 import stat
29 29 import sys
30 30 import uuid
31 31
32 32 from multiprocessing import Process
33 33
34 34 import zmq
35 35 from zmq.devices import ProcessMonitoredQueue
36 36 from zmq.log.handlers import PUBHandler
37 37 from zmq.utils import jsonapi as json
38 38
39 39 from IPython.config.application import boolean_flag
40 40 from IPython.core.profiledir import ProfileDir
41 41
42 42 from IPython.parallel.apps.baseapp import (
43 43 BaseParallelApplication,
44 44 base_aliases,
45 45 base_flags,
46 46 )
47 47 from IPython.utils.importstring import import_item
48 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
49 49
50 50 # from IPython.parallel.controller.controller import ControllerFactory
51 51 from IPython.zmq.session import Session
52 52 from IPython.parallel.controller.heartmonitor import HeartMonitor
53 53 from IPython.parallel.controller.hub import HubFactory
54 54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56 56
57 57 from IPython.parallel.util import signal_children, split_url, asbytes
58 58
59 59 # conditional import of MongoDB backend class
60 60
61 61 try:
62 62 from IPython.parallel.controller.mongodb import MongoDB
63 63 except ImportError:
64 64 maybe_mongo = []
65 65 else:
66 66 maybe_mongo = [MongoDB]
67 67
68 68
69 69 #-----------------------------------------------------------------------------
70 70 # Module level variables
71 71 #-----------------------------------------------------------------------------
72 72
73 73
74 74 #: The default config file name for this application
75 75 default_config_file_name = u'ipcontroller_config.py'
76 76
77 77
78 78 _description = """Start the IPython controller for parallel computing.
79 79
80 80 The IPython controller provides a gateway between the IPython engines and
81 81 clients. The controller needs to be started before the engines and can be
82 82 configured using command line options or using a cluster directory. Cluster
83 83 directories contain config, log and security files and are usually located in
84 84 your ipython directory and named as "profile_name". See the `profile`
85 85 and `profile-dir` options for details.
86 86 """
87 87
88 88 _examples = """
89 89 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
90 90 ipcontroller --scheme=pure # use the pure zeromq scheduler
91 91 """
92 92
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # The main application
96 96 #-----------------------------------------------------------------------------
97 97 flags = {}
98 98 flags.update(base_flags)
99 99 flags.update({
100 100 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
101 101 'Use threads instead of processes for the schedulers'),
102 102 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
103 103 'use the SQLiteDB backend'),
104 104 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
105 105 'use the MongoDB backend'),
106 106 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
107 107 'use the in-memory DictDB backend'),
108 108 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
109 109 'reuse existing json connection files')
110 110 })
111 111
112 112 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
113 113 "Use HMAC digests for authentication of messages.",
114 114 "Don't authenticate messages."
115 115 ))
116 116 aliases = dict(
117 117 secure = 'IPControllerApp.secure',
118 118 ssh = 'IPControllerApp.ssh_server',
119 119 enginessh = 'IPControllerApp.engine_ssh_server',
120 120 location = 'IPControllerApp.location',
121 121
122 122 ident = 'Session.session',
123 123 user = 'Session.username',
124 124 keyfile = 'Session.keyfile',
125 125
126 126 url = 'HubFactory.url',
127 127 ip = 'HubFactory.ip',
128 128 transport = 'HubFactory.transport',
129 129 port = 'HubFactory.regport',
130 130
131 131 ping = 'HeartMonitor.period',
132 132
133 133 scheme = 'TaskScheduler.scheme_name',
134 134 hwm = 'TaskScheduler.hwm',
135 135 )
136 136 aliases.update(base_aliases)
137 137
138 138
139 139 class IPControllerApp(BaseParallelApplication):
140 140
141 141 name = u'ipcontroller'
142 142 description = _description
143 143 examples = _examples
144 144 config_file_name = Unicode(default_config_file_name)
145 145 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
146 146
147 147 # change default to True
148 148 auto_create = Bool(True, config=True,
149 149 help="""Whether to create profile dir if it doesn't exist.""")
150 150
151 151 reuse_files = Bool(False, config=True,
152 152 help='Whether to reuse existing json connection files.'
153 153 )
154 154 secure = Bool(True, config=True,
155 155 help='Whether to use HMAC digests for extra message authentication.'
156 156 )
157 157 ssh_server = Unicode(u'', config=True,
158 158 help="""ssh url for clients to use when connecting to the Controller
159 159 processes. It should be of the form: [user@]server[:port]. The
160 160 Controller's listening addresses must be accessible from the ssh server""",
161 161 )
162 162 engine_ssh_server = Unicode(u'', config=True,
163 163 help="""ssh url for engines to use when connecting to the Controller
164 164 processes. It should be of the form: [user@]server[:port]. The
165 165 Controller's listening addresses must be accessible from the ssh server""",
166 166 )
167 167 location = Unicode(u'', config=True,
168 168 help="""The external IP or domain name of the Controller, used for disambiguating
169 169 engine and client connections.""",
170 170 )
171 171 import_statements = List([], config=True,
172 172 help="import statements to be run at startup. Necessary in some environments"
173 173 )
174 174
175 175 use_threads = Bool(False, config=True,
176 176 help='Use threads instead of processes for the schedulers',
177 177 )
178 178
179 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
180 help="JSON filename where engine connection info will be stored.")
181 client_json_file = Unicode('ipcontroller-client.json', config=True,
182 help="JSON filename where client connection info will be stored.")
183
184 def _cluster_id_changed(self, name, old, new):
185 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
186 self.engine_json_file = "%s-engine.json"%self.name
187 self.client_json_file = "%s-client.json"%self.name
188
189
179 190 # internal
180 191 children = List()
181 192 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
182 193
183 194 def _use_threads_changed(self, name, old, new):
184 195 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
185 196
186 197 aliases = Dict(aliases)
187 198 flags = Dict(flags)
188 199
189 200
190 201 def save_connection_dict(self, fname, cdict):
191 202 """save a connection dict to json file."""
192 203 c = self.config
193 204 url = cdict['url']
194 205 location = cdict['location']
195 206 if not location:
196 207 try:
197 208 proto,ip,port = split_url(url)
198 209 except AssertionError:
199 210 pass
200 211 else:
201 212 try:
202 213 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
203 214 except (socket.gaierror, IndexError):
204 215 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
205 216 " You may need to specify '--location=<external_ip_address>' to help"
206 217 " IPython decide when to connect via loopback.")
207 218 location = '127.0.0.1'
208 219 cdict['location'] = location
209 220 fname = os.path.join(self.profile_dir.security_dir, fname)
210 221 with open(fname, 'wb') as f:
211 222 f.write(json.dumps(cdict, indent=2))
212 223 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
213 224
214 225 def load_config_from_json(self):
215 226 """load config from existing json connector files."""
216 227 c = self.config
217 228 # load from engine config
218 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
229 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
219 230 cfg = json.loads(f.read())
220 231 key = c.Session.key = asbytes(cfg['exec_key'])
221 232 xport,addr = cfg['url'].split('://')
222 233 c.HubFactory.engine_transport = xport
223 234 ip,ports = addr.split(':')
224 235 c.HubFactory.engine_ip = ip
225 236 c.HubFactory.regport = int(ports)
226 237 self.location = cfg['location']
227 238 if not self.engine_ssh_server:
228 239 self.engine_ssh_server = cfg['ssh']
229 240 # load client config
230 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
241 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
231 242 cfg = json.loads(f.read())
232 243 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
233 244 xport,addr = cfg['url'].split('://')
234 245 c.HubFactory.client_transport = xport
235 246 ip,ports = addr.split(':')
236 247 c.HubFactory.client_ip = ip
237 248 if not self.ssh_server:
238 249 self.ssh_server = cfg['ssh']
239 250 assert int(ports) == c.HubFactory.regport, "regport mismatch"
240 251
241 252 def init_hub(self):
242 253 c = self.config
243 254
244 255 self.do_import_statements()
245 256 reusing = self.reuse_files
246 257 if reusing:
247 258 try:
248 259 self.load_config_from_json()
249 260 except (AssertionError,IOError):
250 261 reusing=False
251 262 # check again, because reusing may have failed:
252 263 if reusing:
253 264 pass
254 265 elif self.secure:
255 266 key = str(uuid.uuid4())
256 267 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
257 268 # with open(keyfile, 'w') as f:
258 269 # f.write(key)
259 270 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
260 271 c.Session.key = asbytes(key)
261 272 else:
262 273 key = c.Session.key = b''
263 274
264 275 try:
265 276 self.factory = HubFactory(config=c, log=self.log)
266 277 # self.start_logging()
267 278 self.factory.init_hub()
268 279 except:
269 280 self.log.error("Couldn't construct the Controller", exc_info=True)
270 281 self.exit(1)
271 282
272 283 if not reusing:
273 284 # save to new json config files
274 285 f = self.factory
275 286 cdict = {'exec_key' : key,
276 287 'ssh' : self.ssh_server,
277 288 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
278 289 'location' : self.location
279 290 }
280 self.save_connection_dict('ipcontroller-client.json', cdict)
291 self.save_connection_dict(self.client_json_file, cdict)
281 292 edict = cdict
282 293 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
283 294 edict['ssh'] = self.engine_ssh_server
284 self.save_connection_dict('ipcontroller-engine.json', edict)
295 self.save_connection_dict(self.engine_json_file, edict)
285 296
286 297 #
287 298 def init_schedulers(self):
288 299 children = self.children
289 300 mq = import_item(str(self.mq_class))
290 301
291 302 hub = self.factory
292 303 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
293 304 # IOPub relay (in a Process)
294 305 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
295 306 q.bind_in(hub.client_info['iopub'])
296 307 q.bind_out(hub.engine_info['iopub'])
297 308 q.setsockopt_out(zmq.SUBSCRIBE, b'')
298 309 q.connect_mon(hub.monitor_url)
299 310 q.daemon=True
300 311 children.append(q)
301 312
302 313 # Multiplexer Queue (in a Process)
303 314 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
304 315 q.bind_in(hub.client_info['mux'])
305 316 q.setsockopt_in(zmq.IDENTITY, b'mux')
306 317 q.bind_out(hub.engine_info['mux'])
307 318 q.connect_mon(hub.monitor_url)
308 319 q.daemon=True
309 320 children.append(q)
310 321
311 322 # Control Queue (in a Process)
312 323 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
313 324 q.bind_in(hub.client_info['control'])
314 325 q.setsockopt_in(zmq.IDENTITY, b'control')
315 326 q.bind_out(hub.engine_info['control'])
316 327 q.connect_mon(hub.monitor_url)
317 328 q.daemon=True
318 329 children.append(q)
319 330 try:
320 331 scheme = self.config.TaskScheduler.scheme_name
321 332 except AttributeError:
322 333 scheme = TaskScheduler.scheme_name.get_default_value()
323 334 # Task Queue (in a Process)
324 335 if scheme == 'pure':
325 336 self.log.warn("task::using pure XREQ Task scheduler")
326 337 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
327 338 # q.setsockopt_out(zmq.HWM, hub.hwm)
328 339 q.bind_in(hub.client_info['task'][1])
329 340 q.setsockopt_in(zmq.IDENTITY, b'task')
330 341 q.bind_out(hub.engine_info['task'])
331 342 q.connect_mon(hub.monitor_url)
332 343 q.daemon=True
333 344 children.append(q)
334 345 elif scheme == 'none':
335 346 self.log.warn("task::using no Task scheduler")
336 347
337 348 else:
338 349 self.log.info("task::using Python %s Task scheduler"%scheme)
339 350 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
340 351 hub.monitor_url, hub.client_info['notification'])
341 352 kwargs = dict(logname='scheduler', loglevel=self.log_level,
342 353 log_url = self.log_url, config=dict(self.config))
343 354 if 'Process' in self.mq_class:
344 355 # run the Python scheduler in a Process
345 356 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
346 357 q.daemon=True
347 358 children.append(q)
348 359 else:
349 360 # single-threaded Controller
350 361 kwargs['in_thread'] = True
351 362 launch_scheduler(*sargs, **kwargs)
352 363
353 364
354 365 def save_urls(self):
355 366 """save the registration urls to files."""
356 367 c = self.config
357 368
358 369 sec_dir = self.profile_dir.security_dir
359 370 cf = self.factory
360 371
361 372 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
362 373 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
363 374
364 375 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
365 376 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
366 377
367 378
368 379 def do_import_statements(self):
369 380 statements = self.import_statements
370 381 for s in statements:
371 382 try:
372 383 self.log.msg("Executing statement: '%s'" % s)
373 384 exec s in globals(), locals()
374 385 except:
375 386 self.log.msg("Error running statement: %s" % s)
376 387
377 388 def forward_logging(self):
378 389 if self.log_url:
379 390 self.log.info("Forwarding logging to %s"%self.log_url)
380 391 context = zmq.Context.instance()
381 392 lsock = context.socket(zmq.PUB)
382 393 lsock.connect(self.log_url)
383 394 handler = PUBHandler(lsock)
384 395 self.log.removeHandler(self._log_handler)
385 396 handler.root_topic = 'controller'
386 397 handler.setLevel(self.log_level)
387 398 self.log.addHandler(handler)
388 399 self._log_handler = handler
389 400 # #
390 401
391 402 def initialize(self, argv=None):
392 403 super(IPControllerApp, self).initialize(argv)
393 404 self.forward_logging()
394 405 self.init_hub()
395 406 self.init_schedulers()
396 407
397 408 def start(self):
398 409 # Start the subprocesses:
399 410 self.factory.start()
400 411 child_procs = []
401 412 for child in self.children:
402 413 child.start()
403 414 if isinstance(child, ProcessMonitoredQueue):
404 415 child_procs.append(child.launcher)
405 416 elif isinstance(child, Process):
406 417 child_procs.append(child)
407 418 if child_procs:
408 419 signal_children(child_procs)
409 420
410 421 self.write_pid_file(overwrite=True)
411 422
412 423 try:
413 424 self.factory.loop.start()
414 425 except KeyboardInterrupt:
415 426 self.log.critical("Interrupted, Exiting...\n")
416 427
417 428
418 429
419 430 def launch_new_instance():
420 431 """Create and run the IPython controller"""
421 432 if sys.platform == 'win32':
422 433 # make sure we don't get called from a multiprocessing subprocess
423 434 # this can result in infinite Controllers being started on Windows
424 435 # which doesn't have a proper fork, so multiprocessing is wonky
425 436
426 437 # this only comes up when IPython has been installed using vanilla
427 438 # setuptools, and *not* distribute.
428 439 import multiprocessing
429 440 p = multiprocessing.current_process()
430 441 # the main process has name 'MainProcess'
431 442 # subprocesses will have names like 'Process-1'
432 443 if p.name != 'MainProcess':
433 444 # we are a subprocess, don't start another Controller!
434 445 return
435 446 app = IPControllerApp.instance()
436 447 app.initialize()
437 448 app.start()
438 449
439 450
440 451 if __name__ == '__main__':
441 452 launch_new_instance()
@@ -1,336 +1,344 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 )
38 38 from IPython.zmq.log import EnginePUBHandler
39 39
40 40 from IPython.config.configurable import Configurable
41 41 from IPython.zmq.session import Session
42 42 from IPython.parallel.engine.engine import EngineFactory
43 43 from IPython.parallel.engine.streamkernel import Kernel
44 44 from IPython.parallel.util import disambiguate_url, asbytes
45 45
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Module level variables
52 52 #-----------------------------------------------------------------------------
53 53
54 54 #: The default config file name for this application
55 55 default_config_file_name = u'ipengine_config.py'
56 56
57 57 _description = """Start an IPython engine for parallel computing.
58 58
59 59 IPython engines run in parallel and perform computations on behalf of a client
60 60 and controller. A controller needs to be started before the engines. The
61 61 engine can be configured using command line options or using a cluster
62 62 directory. Cluster directories contain config, log and security files and are
63 63 usually located in your ipython directory and named as "profile_name".
64 64 See the `profile` and `profile-dir` options for details.
65 65 """
66 66
67 67 _examples = """
68 68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
69 69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
70 70 """
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # MPI configuration
74 74 #-----------------------------------------------------------------------------
75 75
76 76 mpi4py_init = """from mpi4py import MPI as mpi
77 77 mpi.size = mpi.COMM_WORLD.Get_size()
78 78 mpi.rank = mpi.COMM_WORLD.Get_rank()
79 79 """
80 80
81 81
82 82 pytrilinos_init = """from PyTrilinos import Epetra
83 83 class SimpleStruct:
84 84 pass
85 85 mpi = SimpleStruct()
86 86 mpi.rank = 0
87 87 mpi.size = 0
88 88 """
89 89
90 90 class MPI(Configurable):
91 91 """Configurable for MPI initialization"""
92 92 use = Unicode('', config=True,
93 93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
94 94 )
95 95
96 96 def _on_use_changed(self, old, new):
97 97 # load default init script if it's not set
98 98 if not self.init_script:
99 99 self.init_script = self.default_inits.get(new, '')
100 100
101 101 init_script = Unicode('', config=True,
102 102 help="Initialization code for MPI")
103 103
104 104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
105 105 config=True)
106 106
107 107
108 108 #-----------------------------------------------------------------------------
109 109 # Main application
110 110 #-----------------------------------------------------------------------------
111 111 aliases = dict(
112 112 file = 'IPEngineApp.url_file',
113 113 c = 'IPEngineApp.startup_command',
114 114 s = 'IPEngineApp.startup_script',
115 115
116 116 ident = 'Session.session',
117 117 user = 'Session.username',
118 118 keyfile = 'Session.keyfile',
119 119
120 120 url = 'EngineFactory.url',
121 121 ssh = 'EngineFactory.sshserver',
122 122 sshkey = 'EngineFactory.sshkey',
123 123 ip = 'EngineFactory.ip',
124 124 transport = 'EngineFactory.transport',
125 125 port = 'EngineFactory.regport',
126 126 location = 'EngineFactory.location',
127 127
128 128 timeout = 'EngineFactory.timeout',
129 129
130 130 mpi = 'MPI.use',
131 131
132 132 )
133 133 aliases.update(base_aliases)
134 134
135 135
136 136 class IPEngineApp(BaseParallelApplication):
137 137
138 name = Unicode(u'ipengine')
139 description = Unicode(_description)
138 name = 'ipengine'
139 description = _description
140 140 examples = _examples
141 141 config_file_name = Unicode(default_config_file_name)
142 142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
143 143
144 144 startup_script = Unicode(u'', config=True,
145 145 help='specify a script to be run at startup')
146 146 startup_command = Unicode('', config=True,
147 147 help='specify a command to be run at startup')
148 148
149 149 url_file = Unicode(u'', config=True,
150 150 help="""The full location of the file containing the connection information for
151 151 the controller. If this is not given, the file must be in the
152 152 security directory of the cluster directory. This location is
153 153 resolved using the `profile` or `profile_dir` options.""",
154 154 )
155 155 wait_for_url_file = Float(5, config=True,
156 156 help="""The maximum number of seconds to wait for url_file to exist.
157 157 This is useful for batch-systems and shared-filesystems where the
158 158 controller and engine are started at the same time and it
159 159 may take a moment for the controller to write the connector files.""")
160 160
161 url_file_name = Unicode(u'ipcontroller-engine.json')
161 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
162
163 def _cluster_id_changed(self, name, old, new):
164 if new:
165 base = 'ipcontroller-%s'%new
166 else:
167 base = 'ipcontroller'
168 self.url_file_name = "%s-engine.json"%base
169
162 170 log_url = Unicode('', config=True,
163 171 help="""The URL for the iploggerapp instance, for forwarding
164 172 logging to a central location.""")
165 173
166 174 aliases = Dict(aliases)
167 175
168 176 # def find_key_file(self):
169 177 # """Set the key file.
170 178 #
171 179 # Here we don't try to actually see if it exists for is valid as that
172 180 # is hadled by the connection logic.
173 181 # """
174 182 # config = self.master_config
175 183 # # Find the actual controller key file
176 184 # if not config.Global.key_file:
177 185 # try_this = os.path.join(
178 186 # config.Global.profile_dir,
179 187 # config.Global.security_dir,
180 188 # config.Global.key_file_name
181 189 # )
182 190 # config.Global.key_file = try_this
183 191
184 192 def find_url_file(self):
185 193 """Set the url file.
186 194
187 195 Here we don't try to actually see if it exists for is valid as that
188 196 is hadled by the connection logic.
189 197 """
190 198 config = self.config
191 199 # Find the actual controller key file
192 200 if not self.url_file:
193 201 self.url_file = os.path.join(
194 202 self.profile_dir.security_dir,
195 203 self.url_file_name
196 204 )
197 205
198 206 def load_connector_file(self):
199 207 """load config from a JSON connector file,
200 208 at a *lower* priority than command-line/config files.
201 209 """
202 210
203 211 self.log.info("Loading url_file %r"%self.url_file)
204 212 config = self.config
205 213
206 214 with open(self.url_file) as f:
207 215 d = json.loads(f.read())
208 216
209 217 try:
210 218 config.Session.key
211 219 except AttributeError:
212 220 if d['exec_key']:
213 221 config.Session.key = asbytes(d['exec_key'])
214 222
215 223 try:
216 224 config.EngineFactory.location
217 225 except AttributeError:
218 226 config.EngineFactory.location = d['location']
219 227
220 228 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
221 229 try:
222 230 config.EngineFactory.url
223 231 except AttributeError:
224 232 config.EngineFactory.url = d['url']
225 233
226 234 try:
227 235 config.EngineFactory.sshserver
228 236 except AttributeError:
229 237 config.EngineFactory.sshserver = d['ssh']
230 238
231 239 def init_engine(self):
232 240 # This is the working dir by now.
233 241 sys.path.insert(0, '')
234 242 config = self.config
235 243 # print config
236 244 self.find_url_file()
237 245
238 246 # was the url manually specified?
239 247 keys = set(self.config.EngineFactory.keys())
240 248 keys = keys.union(set(self.config.RegistrationFactory.keys()))
241 249
242 250 if keys.intersection(set(['ip', 'url', 'port'])):
243 251 # Connection info was specified, don't wait for the file
244 252 url_specified = True
245 253 self.wait_for_url_file = 0
246 254 else:
247 255 url_specified = False
248 256
249 257 if self.wait_for_url_file and not os.path.exists(self.url_file):
250 258 self.log.warn("url_file %r not found"%self.url_file)
251 259 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
252 260 tic = time.time()
253 261 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
254 262 # wait for url_file to exist, for up to 10 seconds
255 263 time.sleep(0.1)
256 264
257 265 if os.path.exists(self.url_file):
258 266 self.load_connector_file()
259 267 elif not url_specified:
260 268 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
261 269 self.exit(1)
262 270
263 271
264 272 try:
265 273 exec_lines = config.Kernel.exec_lines
266 274 except AttributeError:
267 275 config.Kernel.exec_lines = []
268 276 exec_lines = config.Kernel.exec_lines
269 277
270 278 if self.startup_script:
271 279 enc = sys.getfilesystemencoding() or 'utf8'
272 280 cmd="execfile(%r)"%self.startup_script.encode(enc)
273 281 exec_lines.append(cmd)
274 282 if self.startup_command:
275 283 exec_lines.append(self.startup_command)
276 284
277 285 # Create the underlying shell class and Engine
278 286 # shell_class = import_item(self.master_config.Global.shell_class)
279 287 # print self.config
280 288 try:
281 289 self.engine = EngineFactory(config=config, log=self.log)
282 290 except:
283 291 self.log.error("Couldn't start the Engine", exc_info=True)
284 292 self.exit(1)
285 293
286 294 def forward_logging(self):
287 295 if self.log_url:
288 296 self.log.info("Forwarding logging to %s"%self.log_url)
289 297 context = self.engine.context
290 298 lsock = context.socket(zmq.PUB)
291 299 lsock.connect(self.log_url)
292 300 self.log.removeHandler(self._log_handler)
293 301 handler = EnginePUBHandler(self.engine, lsock)
294 302 handler.setLevel(self.log_level)
295 303 self.log.addHandler(handler)
296 304 self._log_handler = handler
297 305
298 306 def init_mpi(self):
299 307 global mpi
300 308 self.mpi = MPI(config=self.config)
301 309
302 310 mpi_import_statement = self.mpi.init_script
303 311 if mpi_import_statement:
304 312 try:
305 313 self.log.info("Initializing MPI:")
306 314 self.log.info(mpi_import_statement)
307 315 exec mpi_import_statement in globals()
308 316 except:
309 317 mpi = None
310 318 else:
311 319 mpi = None
312 320
313 321 def initialize(self, argv=None):
314 322 super(IPEngineApp, self).initialize(argv)
315 323 self.init_mpi()
316 324 self.init_engine()
317 325 self.forward_logging()
318 326
319 327 def start(self):
320 328 self.engine.start()
321 329 try:
322 330 self.engine.loop.start()
323 331 except KeyboardInterrupt:
324 332 self.log.critical("Engine Interrupted, shutting down...\n")
325 333
326 334
327 335 def launch_new_instance():
328 336 """Create and run the IPython engine"""
329 337 app = IPEngineApp.instance()
330 338 app.initialize()
331 339 app.start()
332 340
333 341
334 342 if __name__ == '__main__':
335 343 launch_new_instance()
336 344
General Comments 0
You need to be logged in to leave comments. Login now