##// END OF EJS Templates
Backport PR #2490: add ZMQInteractiveShell to IPEngineApp class list...
MinRK -
Show More
@@ -1,384 +1,385 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 catch_config_error,
38 38 )
39 39 from IPython.zmq.log import EnginePUBHandler
40 40 from IPython.zmq.ipkernel import Kernel, IPKernelApp
41 41 from IPython.zmq.session import (
42 42 Session, session_aliases, session_flags
43 43 )
44 from IPython.zmq.zmqshell import ZMQInteractiveShell
44 45
45 46 from IPython.config.configurable import Configurable
46 47
47 48 from IPython.parallel.engine.engine import EngineFactory
48 49 from IPython.parallel.util import disambiguate_url
49 50
50 51 from IPython.utils.importstring import import_item
51 52 from IPython.utils.py3compat import cast_bytes
52 53 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
53 54
54 55
55 56 #-----------------------------------------------------------------------------
56 57 # Module level variables
57 58 #-----------------------------------------------------------------------------
58 59
59 60 #: The default config file name for this application
60 61 default_config_file_name = u'ipengine_config.py'
61 62
62 63 _description = """Start an IPython engine for parallel computing.
63 64
64 65 IPython engines run in parallel and perform computations on behalf of a client
65 66 and controller. A controller needs to be started before the engines. The
66 67 engine can be configured using command line options or using a cluster
67 68 directory. Cluster directories contain config, log and security files and are
68 69 usually located in your ipython directory and named as "profile_name".
69 70 See the `profile` and `profile-dir` options for details.
70 71 """
71 72
72 73 _examples = """
73 74 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
74 75 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
75 76 """
76 77
77 78 #-----------------------------------------------------------------------------
78 79 # MPI configuration
79 80 #-----------------------------------------------------------------------------
80 81
81 82 mpi4py_init = """from mpi4py import MPI as mpi
82 83 mpi.size = mpi.COMM_WORLD.Get_size()
83 84 mpi.rank = mpi.COMM_WORLD.Get_rank()
84 85 """
85 86
86 87
87 88 pytrilinos_init = """from PyTrilinos import Epetra
88 89 class SimpleStruct:
89 90 pass
90 91 mpi = SimpleStruct()
91 92 mpi.rank = 0
92 93 mpi.size = 0
93 94 """
94 95
95 96 class MPI(Configurable):
96 97 """Configurable for MPI initialization"""
97 98 use = Unicode('', config=True,
98 99 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
99 100 )
100 101
101 102 def _use_changed(self, name, old, new):
102 103 # load default init script if it's not set
103 104 if not self.init_script:
104 105 self.init_script = self.default_inits.get(new, '')
105 106
106 107 init_script = Unicode('', config=True,
107 108 help="Initialization code for MPI")
108 109
109 110 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
110 111 config=True)
111 112
112 113
113 114 #-----------------------------------------------------------------------------
114 115 # Main application
115 116 #-----------------------------------------------------------------------------
116 117 aliases = dict(
117 118 file = 'IPEngineApp.url_file',
118 119 c = 'IPEngineApp.startup_command',
119 120 s = 'IPEngineApp.startup_script',
120 121
121 122 url = 'EngineFactory.url',
122 123 ssh = 'EngineFactory.sshserver',
123 124 sshkey = 'EngineFactory.sshkey',
124 125 ip = 'EngineFactory.ip',
125 126 transport = 'EngineFactory.transport',
126 127 port = 'EngineFactory.regport',
127 128 location = 'EngineFactory.location',
128 129
129 130 timeout = 'EngineFactory.timeout',
130 131
131 132 mpi = 'MPI.use',
132 133
133 134 )
134 135 aliases.update(base_aliases)
135 136 aliases.update(session_aliases)
136 137 flags = {}
137 138 flags.update(base_flags)
138 139 flags.update(session_flags)
139 140
140 141 class IPEngineApp(BaseParallelApplication):
141 142
142 143 name = 'ipengine'
143 144 description = _description
144 145 examples = _examples
145 146 config_file_name = Unicode(default_config_file_name)
146 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
147 classes = List([ZMQInteractiveShell, ProfileDir, Session, EngineFactory, Kernel, MPI])
147 148
148 149 startup_script = Unicode(u'', config=True,
149 150 help='specify a script to be run at startup')
150 151 startup_command = Unicode('', config=True,
151 152 help='specify a command to be run at startup')
152 153
153 154 url_file = Unicode(u'', config=True,
154 155 help="""The full location of the file containing the connection information for
155 156 the controller. If this is not given, the file must be in the
156 157 security directory of the cluster directory. This location is
157 158 resolved using the `profile` or `profile_dir` options.""",
158 159 )
159 160 wait_for_url_file = Float(5, config=True,
160 161 help="""The maximum number of seconds to wait for url_file to exist.
161 162 This is useful for batch-systems and shared-filesystems where the
162 163 controller and engine are started at the same time and it
163 164 may take a moment for the controller to write the connector files.""")
164 165
165 166 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
166 167
167 168 def _cluster_id_changed(self, name, old, new):
168 169 if new:
169 170 base = 'ipcontroller-%s' % new
170 171 else:
171 172 base = 'ipcontroller'
172 173 self.url_file_name = "%s-engine.json" % base
173 174
174 175 log_url = Unicode('', config=True,
175 176 help="""The URL for the iploggerapp instance, for forwarding
176 177 logging to a central location.""")
177 178
178 179 # an IPKernelApp instance, used to setup listening for shell frontends
179 180 kernel_app = Instance(IPKernelApp)
180 181
181 182 aliases = Dict(aliases)
182 183 flags = Dict(flags)
183 184
184 185 @property
185 186 def kernel(self):
186 187 """allow access to the Kernel object, so I look like IPKernelApp"""
187 188 return self.engine.kernel
188 189
189 190 def find_url_file(self):
190 191 """Set the url file.
191 192
192 193 Here we don't try to actually see if it exists for is valid as that
193 194 is hadled by the connection logic.
194 195 """
195 196 config = self.config
196 197 # Find the actual controller key file
197 198 if not self.url_file:
198 199 self.url_file = os.path.join(
199 200 self.profile_dir.security_dir,
200 201 self.url_file_name
201 202 )
202 203
203 204 def load_connector_file(self):
204 205 """load config from a JSON connector file,
205 206 at a *lower* priority than command-line/config files.
206 207 """
207 208
208 209 self.log.info("Loading url_file %r", self.url_file)
209 210 config = self.config
210 211
211 212 with open(self.url_file) as f:
212 213 d = json.loads(f.read())
213 214
214 215 if 'exec_key' in d:
215 216 config.Session.key = cast_bytes(d['exec_key'])
216 217
217 218 try:
218 219 config.EngineFactory.location
219 220 except AttributeError:
220 221 config.EngineFactory.location = d['location']
221 222
222 223 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
223 224 try:
224 225 config.EngineFactory.url
225 226 except AttributeError:
226 227 config.EngineFactory.url = d['url']
227 228
228 229 try:
229 230 config.EngineFactory.sshserver
230 231 except AttributeError:
231 232 config.EngineFactory.sshserver = d['ssh']
232 233
233 234 def bind_kernel(self, **kwargs):
234 235 """Promote engine to listening kernel, accessible to frontends."""
235 236 if self.kernel_app is not None:
236 237 return
237 238
238 239 self.log.info("Opening ports for direct connections as an IPython kernel")
239 240
240 241 kernel = self.kernel
241 242
242 243 kwargs.setdefault('config', self.config)
243 244 kwargs.setdefault('log', self.log)
244 245 kwargs.setdefault('profile_dir', self.profile_dir)
245 246 kwargs.setdefault('session', self.engine.session)
246 247
247 248 app = self.kernel_app = IPKernelApp(**kwargs)
248 249
249 250 # allow IPKernelApp.instance():
250 251 IPKernelApp._instance = app
251 252
252 253 app.init_connection_file()
253 254 # relevant contents of init_sockets:
254 255
255 256 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
256 257 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
257 258
258 259 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
259 260 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
260 261
261 262 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
262 263 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
263 264 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
264 265
265 266 # start the heartbeat, and log connection info:
266 267
267 268 app.init_heartbeat()
268 269
269 270 app.log_connection_info()
270 271 app.write_connection_file()
271 272
272 273
273 274 def init_engine(self):
274 275 # This is the working dir by now.
275 276 sys.path.insert(0, '')
276 277 config = self.config
277 278 # print config
278 279 self.find_url_file()
279 280
280 281 # was the url manually specified?
281 282 keys = set(self.config.EngineFactory.keys())
282 283 keys = keys.union(set(self.config.RegistrationFactory.keys()))
283 284
284 285 if keys.intersection(set(['ip', 'url', 'port'])):
285 286 # Connection info was specified, don't wait for the file
286 287 url_specified = True
287 288 self.wait_for_url_file = 0
288 289 else:
289 290 url_specified = False
290 291
291 292 if self.wait_for_url_file and not os.path.exists(self.url_file):
292 293 self.log.warn("url_file %r not found", self.url_file)
293 294 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
294 295 tic = time.time()
295 296 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
296 297 # wait for url_file to exist, or until time limit
297 298 time.sleep(0.1)
298 299
299 300 if os.path.exists(self.url_file):
300 301 self.load_connector_file()
301 302 elif not url_specified:
302 303 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
303 304 self.exit(1)
304 305
305 306
306 307 try:
307 308 exec_lines = config.IPKernelApp.exec_lines
308 309 except AttributeError:
309 310 try:
310 311 exec_lines = config.InteractiveShellApp.exec_lines
311 312 except AttributeError:
312 313 exec_lines = config.IPKernelApp.exec_lines = []
313 314 try:
314 315 exec_files = config.IPKernelApp.exec_files
315 316 except AttributeError:
316 317 try:
317 318 exec_files = config.InteractiveShellApp.exec_files
318 319 except AttributeError:
319 320 exec_files = config.IPKernelApp.exec_files = []
320 321
321 322 if self.startup_script:
322 323 exec_files.append(self.startup_script)
323 324 if self.startup_command:
324 325 exec_lines.append(self.startup_command)
325 326
326 327 # Create the underlying shell class and Engine
327 328 # shell_class = import_item(self.master_config.Global.shell_class)
328 329 # print self.config
329 330 try:
330 331 self.engine = EngineFactory(config=config, log=self.log)
331 332 except:
332 333 self.log.error("Couldn't start the Engine", exc_info=True)
333 334 self.exit(1)
334 335
335 336 def forward_logging(self):
336 337 if self.log_url:
337 338 self.log.info("Forwarding logging to %s", self.log_url)
338 339 context = self.engine.context
339 340 lsock = context.socket(zmq.PUB)
340 341 lsock.connect(self.log_url)
341 342 handler = EnginePUBHandler(self.engine, lsock)
342 343 handler.setLevel(self.log_level)
343 344 self.log.addHandler(handler)
344 345
345 346 def init_mpi(self):
346 347 global mpi
347 348 self.mpi = MPI(config=self.config)
348 349
349 350 mpi_import_statement = self.mpi.init_script
350 351 if mpi_import_statement:
351 352 try:
352 353 self.log.info("Initializing MPI:")
353 354 self.log.info(mpi_import_statement)
354 355 exec mpi_import_statement in globals()
355 356 except:
356 357 mpi = None
357 358 else:
358 359 mpi = None
359 360
360 361 @catch_config_error
361 362 def initialize(self, argv=None):
362 363 super(IPEngineApp, self).initialize(argv)
363 364 self.init_mpi()
364 365 self.init_engine()
365 366 self.forward_logging()
366 367
367 368 def start(self):
368 369 self.engine.start()
369 370 try:
370 371 self.engine.loop.start()
371 372 except KeyboardInterrupt:
372 373 self.log.critical("Engine Interrupted, shutting down...\n")
373 374
374 375
375 376 def launch_new_instance():
376 377 """Create and run the IPython engine"""
377 378 app = IPEngineApp.instance()
378 379 app.initialize()
379 380 app.start()
380 381
381 382
382 383 if __name__ == '__main__':
383 384 launch_new_instance()
384 385
General Comments 0
You need to be logged in to leave comments. Login now