##// END OF EJS Templates
Merge pull request #2269 from minrk/disambiguate_ip...
Fernando Perez -
r8158:fb439f46 merge
parent child Browse files
Show More
@@ -1,398 +1,398
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 catch_config_error,
38 38 )
39 39 from IPython.zmq.log import EnginePUBHandler
40 40 from IPython.zmq.ipkernel import Kernel, IPKernelApp
41 41 from IPython.zmq.session import (
42 42 Session, session_aliases, session_flags
43 43 )
44 44
45 45 from IPython.config.configurable import Configurable
46 46
47 47 from IPython.parallel.engine.engine import EngineFactory
48 48 from IPython.parallel.util import disambiguate_ip_address
49 49
50 50 from IPython.utils.importstring import import_item
51 51 from IPython.utils.py3compat import cast_bytes
52 52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
53 53
54 54
55 55 #-----------------------------------------------------------------------------
56 56 # Module level variables
57 57 #-----------------------------------------------------------------------------
58 58
59 59 #: The default config file name for this application
60 60 default_config_file_name = u'ipengine_config.py'
61 61
62 62 _description = """Start an IPython engine for parallel computing.
63 63
64 64 IPython engines run in parallel and perform computations on behalf of a client
65 65 and controller. A controller needs to be started before the engines. The
66 66 engine can be configured using command line options or using a cluster
67 67 directory. Cluster directories contain config, log and security files and are
68 68 usually located in your ipython directory and named as "profile_name".
69 69 See the `profile` and `profile-dir` options for details.
70 70 """
71 71
72 72 _examples = """
73 73 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
74 74 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
75 75 """
76 76
77 77 #-----------------------------------------------------------------------------
78 78 # MPI configuration
79 79 #-----------------------------------------------------------------------------
80 80
81 81 mpi4py_init = """from mpi4py import MPI as mpi
82 82 mpi.size = mpi.COMM_WORLD.Get_size()
83 83 mpi.rank = mpi.COMM_WORLD.Get_rank()
84 84 """
85 85
86 86
87 87 pytrilinos_init = """from PyTrilinos import Epetra
88 88 class SimpleStruct:
89 89 pass
90 90 mpi = SimpleStruct()
91 91 mpi.rank = 0
92 92 mpi.size = 0
93 93 """
94 94
95 95 class MPI(Configurable):
96 96 """Configurable for MPI initialization"""
97 97 use = Unicode('', config=True,
98 98 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
99 99 )
100 100
101 101 def _use_changed(self, name, old, new):
102 102 # load default init script if it's not set
103 103 if not self.init_script:
104 104 self.init_script = self.default_inits.get(new, '')
105 105
106 106 init_script = Unicode('', config=True,
107 107 help="Initialization code for MPI")
108 108
109 109 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
110 110 config=True)
111 111
112 112
113 113 #-----------------------------------------------------------------------------
114 114 # Main application
115 115 #-----------------------------------------------------------------------------
116 116 aliases = dict(
117 117 file = 'IPEngineApp.url_file',
118 118 c = 'IPEngineApp.startup_command',
119 119 s = 'IPEngineApp.startup_script',
120 120
121 121 url = 'EngineFactory.url',
122 122 ssh = 'EngineFactory.sshserver',
123 123 sshkey = 'EngineFactory.sshkey',
124 124 ip = 'EngineFactory.ip',
125 125 transport = 'EngineFactory.transport',
126 126 port = 'EngineFactory.regport',
127 127 location = 'EngineFactory.location',
128 128
129 129 timeout = 'EngineFactory.timeout',
130 130
131 131 mpi = 'MPI.use',
132 132
133 133 )
134 134 aliases.update(base_aliases)
135 135 aliases.update(session_aliases)
136 136 flags = {}
137 137 flags.update(base_flags)
138 138 flags.update(session_flags)
139 139
140 140 class IPEngineApp(BaseParallelApplication):
141 141
142 142 name = 'ipengine'
143 143 description = _description
144 144 examples = _examples
145 145 config_file_name = Unicode(default_config_file_name)
146 146 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
147 147
148 148 startup_script = Unicode(u'', config=True,
149 149 help='specify a script to be run at startup')
150 150 startup_command = Unicode('', config=True,
151 151 help='specify a command to be run at startup')
152 152
153 153 url_file = Unicode(u'', config=True,
154 154 help="""The full location of the file containing the connection information for
155 155 the controller. If this is not given, the file must be in the
156 156 security directory of the cluster directory. This location is
157 157 resolved using the `profile` or `profile_dir` options.""",
158 158 )
159 159 wait_for_url_file = Float(5, config=True,
160 160 help="""The maximum number of seconds to wait for url_file to exist.
161 161 This is useful for batch-systems and shared-filesystems where the
162 162 controller and engine are started at the same time and it
163 163 may take a moment for the controller to write the connector files.""")
164 164
165 165 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
166 166
167 167 def _cluster_id_changed(self, name, old, new):
168 168 if new:
169 169 base = 'ipcontroller-%s' % new
170 170 else:
171 171 base = 'ipcontroller'
172 172 self.url_file_name = "%s-engine.json" % base
173 173
174 174 log_url = Unicode('', config=True,
175 175 help="""The URL for the iploggerapp instance, for forwarding
176 176 logging to a central location.""")
177 177
178 178 # an IPKernelApp instance, used to setup listening for shell frontends
179 179 kernel_app = Instance(IPKernelApp)
180 180
181 181 aliases = Dict(aliases)
182 182 flags = Dict(flags)
183 183
184 184 @property
185 185 def kernel(self):
186 186 """allow access to the Kernel object, so I look like IPKernelApp"""
187 187 return self.engine.kernel
188 188
189 189 def find_url_file(self):
190 190 """Set the url file.
191 191
192 192 Here we don't try to actually see if it exists for is valid as that
193 193 is hadled by the connection logic.
194 194 """
195 195 config = self.config
196 196 # Find the actual controller key file
197 197 if not self.url_file:
198 198 self.url_file = os.path.join(
199 199 self.profile_dir.security_dir,
200 200 self.url_file_name
201 201 )
202 202
203 203 def load_connector_file(self):
204 204 """load config from a JSON connector file,
205 205 at a *lower* priority than command-line/config files.
206 206 """
207 207
208 208 self.log.info("Loading url_file %r", self.url_file)
209 209 config = self.config
210 210
211 211 with open(self.url_file) as f:
212 212 d = json.loads(f.read())
213 213
214 214 # allow hand-override of location for disambiguation
215 215 # and ssh-server
216 216 try:
217 217 config.EngineFactory.location
218 218 except AttributeError:
219 219 config.EngineFactory.location = d['location']
220 220
221 221 try:
222 222 config.EngineFactory.sshserver
223 223 except AttributeError:
224 224 config.EngineFactory.sshserver = d.get('ssh')
225 225
226 226 location = config.EngineFactory.location
227 227
228 228 proto, ip = d['interface'].split('://')
229 ip = disambiguate_ip_address(ip)
229 ip = disambiguate_ip_address(ip, location)
230 230 d['interface'] = '%s://%s' % (proto, ip)
231 231
232 232 # DO NOT allow override of basic URLs, serialization, or exec_key
233 233 # JSON file takes top priority there
234 234 config.Session.key = cast_bytes(d['exec_key'])
235 235
236 236 config.EngineFactory.url = d['interface'] + ':%i' % d['registration']
237 237
238 238 config.Session.packer = d['pack']
239 239 config.Session.unpacker = d['unpack']
240 240
241 241 self.log.debug("Config changed:")
242 242 self.log.debug("%r", config)
243 243 self.connection_info = d
244 244
245 245 def bind_kernel(self, **kwargs):
246 246 """Promote engine to listening kernel, accessible to frontends."""
247 247 if self.kernel_app is not None:
248 248 return
249 249
250 250 self.log.info("Opening ports for direct connections as an IPython kernel")
251 251
252 252 kernel = self.kernel
253 253
254 254 kwargs.setdefault('config', self.config)
255 255 kwargs.setdefault('log', self.log)
256 256 kwargs.setdefault('profile_dir', self.profile_dir)
257 257 kwargs.setdefault('session', self.engine.session)
258 258
259 259 app = self.kernel_app = IPKernelApp(**kwargs)
260 260
261 261 # allow IPKernelApp.instance():
262 262 IPKernelApp._instance = app
263 263
264 264 app.init_connection_file()
265 265 # relevant contents of init_sockets:
266 266
267 267 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
268 268 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
269 269
270 270 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
271 271 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
272 272
273 273 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
274 274 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
275 275 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
276 276
277 277 # start the heartbeat, and log connection info:
278 278
279 279 app.init_heartbeat()
280 280
281 281 app.log_connection_info()
282 282 app.write_connection_file()
283 283
284 284
285 285 def init_engine(self):
286 286 # This is the working dir by now.
287 287 sys.path.insert(0, '')
288 288 config = self.config
289 289 # print config
290 290 self.find_url_file()
291 291
292 292 # was the url manually specified?
293 293 keys = set(self.config.EngineFactory.keys())
294 294 keys = keys.union(set(self.config.RegistrationFactory.keys()))
295 295
296 296 if keys.intersection(set(['ip', 'url', 'port'])):
297 297 # Connection info was specified, don't wait for the file
298 298 url_specified = True
299 299 self.wait_for_url_file = 0
300 300 else:
301 301 url_specified = False
302 302
303 303 if self.wait_for_url_file and not os.path.exists(self.url_file):
304 304 self.log.warn("url_file %r not found", self.url_file)
305 305 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
306 306 tic = time.time()
307 307 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
308 308 # wait for url_file to exist, or until time limit
309 309 time.sleep(0.1)
310 310
311 311 if os.path.exists(self.url_file):
312 312 self.load_connector_file()
313 313 elif not url_specified:
314 314 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
315 315 self.exit(1)
316 316
317 317
318 318 try:
319 319 exec_lines = config.IPKernelApp.exec_lines
320 320 except AttributeError:
321 321 try:
322 322 exec_lines = config.InteractiveShellApp.exec_lines
323 323 except AttributeError:
324 324 exec_lines = config.IPKernelApp.exec_lines = []
325 325 try:
326 326 exec_files = config.IPKernelApp.exec_files
327 327 except AttributeError:
328 328 try:
329 329 exec_files = config.InteractiveShellApp.exec_files
330 330 except AttributeError:
331 331 exec_files = config.IPKernelApp.exec_files = []
332 332
333 333 if self.startup_script:
334 334 exec_files.append(self.startup_script)
335 335 if self.startup_command:
336 336 exec_lines.append(self.startup_command)
337 337
338 338 # Create the underlying shell class and Engine
339 339 # shell_class = import_item(self.master_config.Global.shell_class)
340 340 # print self.config
341 341 try:
342 342 self.engine = EngineFactory(config=config, log=self.log,
343 343 connection_info=self.connection_info,
344 344 )
345 345 except:
346 346 self.log.error("Couldn't start the Engine", exc_info=True)
347 347 self.exit(1)
348 348
349 349 def forward_logging(self):
350 350 if self.log_url:
351 351 self.log.info("Forwarding logging to %s", self.log_url)
352 352 context = self.engine.context
353 353 lsock = context.socket(zmq.PUB)
354 354 lsock.connect(self.log_url)
355 355 handler = EnginePUBHandler(self.engine, lsock)
356 356 handler.setLevel(self.log_level)
357 357 self.log.addHandler(handler)
358 358
359 359 def init_mpi(self):
360 360 global mpi
361 361 self.mpi = MPI(config=self.config)
362 362
363 363 mpi_import_statement = self.mpi.init_script
364 364 if mpi_import_statement:
365 365 try:
366 366 self.log.info("Initializing MPI:")
367 367 self.log.info(mpi_import_statement)
368 368 exec mpi_import_statement in globals()
369 369 except:
370 370 mpi = None
371 371 else:
372 372 mpi = None
373 373
374 374 @catch_config_error
375 375 def initialize(self, argv=None):
376 376 super(IPEngineApp, self).initialize(argv)
377 377 self.init_mpi()
378 378 self.init_engine()
379 379 self.forward_logging()
380 380
381 381 def start(self):
382 382 self.engine.start()
383 383 try:
384 384 self.engine.loop.start()
385 385 except KeyboardInterrupt:
386 386 self.log.critical("Engine Interrupted, shutting down...\n")
387 387
388 388
389 389 def launch_new_instance():
390 390 """Create and run the IPython engine"""
391 391 app = IPEngineApp.instance()
392 392 app.initialize()
393 393 app.start()
394 394
395 395
396 396 if __name__ == '__main__':
397 397 launch_new_instance()
398 398
General Comments 0
You need to be logged in to leave comments. Login now