##// END OF EJS Templates
Backport PR #5805: fix engine startup files...
Thomas Kluyver -
Show More
@@ -1,394 +1,397 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 catch_config_error,
38 38 )
39 39 from IPython.kernel.zmq.log import EnginePUBHandler
40 40 from IPython.kernel.zmq.ipkernel import Kernel
41 41 from IPython.kernel.zmq.kernelapp import IPKernelApp
42 42 from IPython.kernel.zmq.session import (
43 43 Session, session_aliases, session_flags
44 44 )
45 45 from IPython.kernel.zmq.zmqshell import ZMQInteractiveShell
46 46
47 47 from IPython.config.configurable import Configurable
48 48
49 49 from IPython.parallel.engine.engine import EngineFactory
50 50 from IPython.parallel.util import disambiguate_ip_address
51 51
52 52 from IPython.utils.importstring import import_item
53 53 from IPython.utils.py3compat import cast_bytes
54 54 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
55 55
56 56
57 57 #-----------------------------------------------------------------------------
58 58 # Module level variables
59 59 #-----------------------------------------------------------------------------
60 60
61 61 _description = """Start an IPython engine for parallel computing.
62 62
63 63 IPython engines run in parallel and perform computations on behalf of a client
64 64 and controller. A controller needs to be started before the engines. The
65 65 engine can be configured using command line options or using a cluster
66 66 directory. Cluster directories contain config, log and security files and are
67 67 usually located in your ipython directory and named as "profile_name".
68 68 See the `profile` and `profile-dir` options for details.
69 69 """
70 70
71 71 _examples = """
72 72 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
73 73 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
74 74 """
75 75
76 76 #-----------------------------------------------------------------------------
77 77 # MPI configuration
78 78 #-----------------------------------------------------------------------------
79 79
80 80 mpi4py_init = """from mpi4py import MPI as mpi
81 81 mpi.size = mpi.COMM_WORLD.Get_size()
82 82 mpi.rank = mpi.COMM_WORLD.Get_rank()
83 83 """
84 84
85 85
86 86 pytrilinos_init = """from PyTrilinos import Epetra
87 87 class SimpleStruct:
88 88 pass
89 89 mpi = SimpleStruct()
90 90 mpi.rank = 0
91 91 mpi.size = 0
92 92 """
93 93
94 94 class MPI(Configurable):
95 95 """Configurable for MPI initialization"""
96 96 use = Unicode('', config=True,
97 97 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
98 98 )
99 99
100 100 def _use_changed(self, name, old, new):
101 101 # load default init script if it's not set
102 102 if not self.init_script:
103 103 self.init_script = self.default_inits.get(new, '')
104 104
105 105 init_script = Unicode('', config=True,
106 106 help="Initialization code for MPI")
107 107
108 108 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
109 109 config=True)
110 110
111 111
112 112 #-----------------------------------------------------------------------------
113 113 # Main application
114 114 #-----------------------------------------------------------------------------
115 115 aliases = dict(
116 116 file = 'IPEngineApp.url_file',
117 117 c = 'IPEngineApp.startup_command',
118 118 s = 'IPEngineApp.startup_script',
119 119
120 120 url = 'EngineFactory.url',
121 121 ssh = 'EngineFactory.sshserver',
122 122 sshkey = 'EngineFactory.sshkey',
123 123 ip = 'EngineFactory.ip',
124 124 transport = 'EngineFactory.transport',
125 125 port = 'EngineFactory.regport',
126 126 location = 'EngineFactory.location',
127 127
128 128 timeout = 'EngineFactory.timeout',
129 129
130 130 mpi = 'MPI.use',
131 131
132 132 )
133 133 aliases.update(base_aliases)
134 134 aliases.update(session_aliases)
135 135 flags = {}
136 136 flags.update(base_flags)
137 137 flags.update(session_flags)
138 138
139 139 class IPEngineApp(BaseParallelApplication):
140 140
141 141 name = 'ipengine'
142 142 description = _description
143 143 examples = _examples
144 144 classes = List([ZMQInteractiveShell, ProfileDir, Session, EngineFactory, Kernel, MPI])
145 145
146 146 startup_script = Unicode(u'', config=True,
147 147 help='specify a script to be run at startup')
148 148 startup_command = Unicode('', config=True,
149 149 help='specify a command to be run at startup')
150 150
151 151 url_file = Unicode(u'', config=True,
152 152 help="""The full location of the file containing the connection information for
153 153 the controller. If this is not given, the file must be in the
154 154 security directory of the cluster directory. This location is
155 155 resolved using the `profile` or `profile_dir` options.""",
156 156 )
157 157 wait_for_url_file = Float(5, config=True,
158 158 help="""The maximum number of seconds to wait for url_file to exist.
159 159 This is useful for batch-systems and shared-filesystems where the
160 160 controller and engine are started at the same time and it
161 161 may take a moment for the controller to write the connector files.""")
162 162
163 163 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
164 164
165 165 def _cluster_id_changed(self, name, old, new):
166 166 if new:
167 167 base = 'ipcontroller-%s' % new
168 168 else:
169 169 base = 'ipcontroller'
170 170 self.url_file_name = "%s-engine.json" % base
171 171
172 172 log_url = Unicode('', config=True,
173 173 help="""The URL for the iploggerapp instance, for forwarding
174 174 logging to a central location.""")
175 175
176 176 # an IPKernelApp instance, used to setup listening for shell frontends
177 177 kernel_app = Instance(IPKernelApp)
178 178
179 179 aliases = Dict(aliases)
180 180 flags = Dict(flags)
181 181
182 182 @property
183 183 def kernel(self):
184 184 """allow access to the Kernel object, so I look like IPKernelApp"""
185 185 return self.engine.kernel
186 186
187 187 def find_url_file(self):
188 188 """Set the url file.
189 189
190 190 Here we don't try to actually see if it exists for is valid as that
191 191 is hadled by the connection logic.
192 192 """
193 193 config = self.config
194 194 # Find the actual controller key file
195 195 if not self.url_file:
196 196 self.url_file = os.path.join(
197 197 self.profile_dir.security_dir,
198 198 self.url_file_name
199 199 )
200 200
201 201 def load_connector_file(self):
202 202 """load config from a JSON connector file,
203 203 at a *lower* priority than command-line/config files.
204 204 """
205 205
206 206 self.log.info("Loading url_file %r", self.url_file)
207 207 config = self.config
208 208
209 209 with open(self.url_file) as f:
210 210 num_tries = 0
211 211 max_tries = 5
212 212 d = ""
213 213 while not d:
214 214 try:
215 215 d = json.loads(f.read())
216 216 except ValueError:
217 217 if num_tries > max_tries:
218 218 raise
219 219 num_tries += 1
220 220 time.sleep(0.5)
221 221
222 222 # allow hand-override of location for disambiguation
223 223 # and ssh-server
224 224 if 'EngineFactory.location' not in config:
225 225 config.EngineFactory.location = d['location']
226 226 if 'EngineFactory.sshserver' not in config:
227 227 config.EngineFactory.sshserver = d.get('ssh')
228 228
229 229 location = config.EngineFactory.location
230 230
231 231 proto, ip = d['interface'].split('://')
232 232 ip = disambiguate_ip_address(ip, location)
233 233 d['interface'] = '%s://%s' % (proto, ip)
234 234
235 235 # DO NOT allow override of basic URLs, serialization, or key
236 236 # JSON file takes top priority there
237 237 config.Session.key = cast_bytes(d['key'])
238 238 config.Session.signature_scheme = d['signature_scheme']
239 239
240 240 config.EngineFactory.url = d['interface'] + ':%i' % d['registration']
241 241
242 242 config.Session.packer = d['pack']
243 243 config.Session.unpacker = d['unpack']
244 244
245 245 self.log.debug("Config changed:")
246 246 self.log.debug("%r", config)
247 247 self.connection_info = d
248 248
249 249 def bind_kernel(self, **kwargs):
250 250 """Promote engine to listening kernel, accessible to frontends."""
251 251 if self.kernel_app is not None:
252 252 return
253 253
254 254 self.log.info("Opening ports for direct connections as an IPython kernel")
255 255
256 256 kernel = self.kernel
257 257
258 258 kwargs.setdefault('config', self.config)
259 259 kwargs.setdefault('log', self.log)
260 260 kwargs.setdefault('profile_dir', self.profile_dir)
261 261 kwargs.setdefault('session', self.engine.session)
262 262
263 263 app = self.kernel_app = IPKernelApp(**kwargs)
264 264
265 265 # allow IPKernelApp.instance():
266 266 IPKernelApp._instance = app
267 267
268 268 app.init_connection_file()
269 269 # relevant contents of init_sockets:
270 270
271 271 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
272 272 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
273 273
274 274 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
275 275 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
276 276
277 277 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
278 278 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
279 279 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
280 280
281 281 # start the heartbeat, and log connection info:
282 282
283 283 app.init_heartbeat()
284 284
285 285 app.log_connection_info()
286 286 app.write_connection_file()
287 287
288 288
289 289 def init_engine(self):
290 290 # This is the working dir by now.
291 291 sys.path.insert(0, '')
292 292 config = self.config
293 293 # print config
294 294 self.find_url_file()
295 295
296 296 # was the url manually specified?
297 297 keys = set(self.config.EngineFactory.keys())
298 298 keys = keys.union(set(self.config.RegistrationFactory.keys()))
299 299
300 300 if keys.intersection(set(['ip', 'url', 'port'])):
301 301 # Connection info was specified, don't wait for the file
302 302 url_specified = True
303 303 self.wait_for_url_file = 0
304 304 else:
305 305 url_specified = False
306 306
307 307 if self.wait_for_url_file and not os.path.exists(self.url_file):
308 308 self.log.warn("url_file %r not found", self.url_file)
309 309 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
310 310 tic = time.time()
311 311 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
312 312 # wait for url_file to exist, or until time limit
313 313 time.sleep(0.1)
314 314
315 315 if os.path.exists(self.url_file):
316 316 self.load_connector_file()
317 317 elif not url_specified:
318 318 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
319 319 self.exit(1)
320 320
321 321 exec_lines = []
322 322 for app in ('IPKernelApp', 'InteractiveShellApp'):
323 if '%s.exec_lines' in config:
324 exec_lines = config.IPKernelApp.exec_lines = config[app].exec_lines
323 if '%s.exec_lines' % app in config:
324 exec_lines = config[app].exec_lines
325 325 break
326 326
327 327 exec_files = []
328 328 for app in ('IPKernelApp', 'InteractiveShellApp'):
329 if '%s.exec_files' in config:
330 exec_files = config.IPKernelApp.exec_files = config[app].exec_files
329 if '%s.exec_files' % app in config:
330 exec_files = config[app].exec_files
331 331 break
332 332
333 config.IPKernelApp.exec_lines = exec_lines
334 config.IPKernelApp.exec_files = exec_files
335
333 336 if self.startup_script:
334 337 exec_files.append(self.startup_script)
335 338 if self.startup_command:
336 339 exec_lines.append(self.startup_command)
337 340
338 341 # Create the underlying shell class and Engine
339 342 # shell_class = import_item(self.master_config.Global.shell_class)
340 343 # print self.config
341 344 try:
342 345 self.engine = EngineFactory(config=config, log=self.log,
343 346 connection_info=self.connection_info,
344 347 )
345 348 except:
346 349 self.log.error("Couldn't start the Engine", exc_info=True)
347 350 self.exit(1)
348 351
349 352 def forward_logging(self):
350 353 if self.log_url:
351 354 self.log.info("Forwarding logging to %s", self.log_url)
352 355 context = self.engine.context
353 356 lsock = context.socket(zmq.PUB)
354 357 lsock.connect(self.log_url)
355 358 handler = EnginePUBHandler(self.engine, lsock)
356 359 handler.setLevel(self.log_level)
357 360 self.log.addHandler(handler)
358 361
359 362 def init_mpi(self):
360 363 global mpi
361 364 self.mpi = MPI(parent=self)
362 365
363 366 mpi_import_statement = self.mpi.init_script
364 367 if mpi_import_statement:
365 368 try:
366 369 self.log.info("Initializing MPI:")
367 370 self.log.info(mpi_import_statement)
368 371 exec(mpi_import_statement, globals())
369 372 except:
370 373 mpi = None
371 374 else:
372 375 mpi = None
373 376
374 377 @catch_config_error
375 378 def initialize(self, argv=None):
376 379 super(IPEngineApp, self).initialize(argv)
377 380 self.init_mpi()
378 381 self.init_engine()
379 382 self.forward_logging()
380 383
381 384 def start(self):
382 385 self.engine.start()
383 386 try:
384 387 self.engine.loop.start()
385 388 except KeyboardInterrupt:
386 389 self.log.critical("Engine Interrupted, shutting down...\n")
387 390
388 391
389 392 launch_new_instance = IPEngineApp.launch_instance
390 393
391 394
392 395 if __name__ == '__main__':
393 396 launch_new_instance()
394 397
General Comments 0
You need to be logged in to leave comments. Login now