##// END OF EJS Templates
Backport PR #2490: add ZMQInteractiveShell to IPEngineApp class list...
MinRK -
Show More
@@ -1,384 +1,385 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27 import time
27 import time
28
28
29 import zmq
29 import zmq
30 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
31
31
32 from IPython.core.profiledir import ProfileDir
32 from IPython.core.profiledir import ProfileDir
33 from IPython.parallel.apps.baseapp import (
33 from IPython.parallel.apps.baseapp import (
34 BaseParallelApplication,
34 BaseParallelApplication,
35 base_aliases,
35 base_aliases,
36 base_flags,
36 base_flags,
37 catch_config_error,
37 catch_config_error,
38 )
38 )
39 from IPython.zmq.log import EnginePUBHandler
39 from IPython.zmq.log import EnginePUBHandler
40 from IPython.zmq.ipkernel import Kernel, IPKernelApp
40 from IPython.zmq.ipkernel import Kernel, IPKernelApp
41 from IPython.zmq.session import (
41 from IPython.zmq.session import (
42 Session, session_aliases, session_flags
42 Session, session_aliases, session_flags
43 )
43 )
44 from IPython.zmq.zmqshell import ZMQInteractiveShell
44
45
45 from IPython.config.configurable import Configurable
46 from IPython.config.configurable import Configurable
46
47
47 from IPython.parallel.engine.engine import EngineFactory
48 from IPython.parallel.engine.engine import EngineFactory
48 from IPython.parallel.util import disambiguate_url
49 from IPython.parallel.util import disambiguate_url
49
50
50 from IPython.utils.importstring import import_item
51 from IPython.utils.importstring import import_item
51 from IPython.utils.py3compat import cast_bytes
52 from IPython.utils.py3compat import cast_bytes
52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
53 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
53
54
54
55
55 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
56 # Module level variables
57 # Module level variables
57 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
58
59
59 #: The default config file name for this application
60 #: The default config file name for this application
60 default_config_file_name = u'ipengine_config.py'
61 default_config_file_name = u'ipengine_config.py'
61
62
62 _description = """Start an IPython engine for parallel computing.
63 _description = """Start an IPython engine for parallel computing.
63
64
64 IPython engines run in parallel and perform computations on behalf of a client
65 IPython engines run in parallel and perform computations on behalf of a client
65 and controller. A controller needs to be started before the engines. The
66 and controller. A controller needs to be started before the engines. The
66 engine can be configured using command line options or using a cluster
67 engine can be configured using command line options or using a cluster
67 directory. Cluster directories contain config, log and security files and are
68 directory. Cluster directories contain config, log and security files and are
68 usually located in your ipython directory and named as "profile_name".
69 usually located in your ipython directory and named as "profile_name".
69 See the `profile` and `profile-dir` options for details.
70 See the `profile` and `profile-dir` options for details.
70 """
71 """
71
72
72 _examples = """
73 _examples = """
73 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
74 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
74 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
75 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
75 """
76 """
76
77
77 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
78 # MPI configuration
79 # MPI configuration
79 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
80
81
81 mpi4py_init = """from mpi4py import MPI as mpi
82 mpi4py_init = """from mpi4py import MPI as mpi
82 mpi.size = mpi.COMM_WORLD.Get_size()
83 mpi.size = mpi.COMM_WORLD.Get_size()
83 mpi.rank = mpi.COMM_WORLD.Get_rank()
84 mpi.rank = mpi.COMM_WORLD.Get_rank()
84 """
85 """
85
86
86
87
87 pytrilinos_init = """from PyTrilinos import Epetra
88 pytrilinos_init = """from PyTrilinos import Epetra
88 class SimpleStruct:
89 class SimpleStruct:
89 pass
90 pass
90 mpi = SimpleStruct()
91 mpi = SimpleStruct()
91 mpi.rank = 0
92 mpi.rank = 0
92 mpi.size = 0
93 mpi.size = 0
93 """
94 """
94
95
95 class MPI(Configurable):
96 class MPI(Configurable):
96 """Configurable for MPI initialization"""
97 """Configurable for MPI initialization"""
97 use = Unicode('', config=True,
98 use = Unicode('', config=True,
98 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
99 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
99 )
100 )
100
101
101 def _use_changed(self, name, old, new):
102 def _use_changed(self, name, old, new):
102 # load default init script if it's not set
103 # load default init script if it's not set
103 if not self.init_script:
104 if not self.init_script:
104 self.init_script = self.default_inits.get(new, '')
105 self.init_script = self.default_inits.get(new, '')
105
106
106 init_script = Unicode('', config=True,
107 init_script = Unicode('', config=True,
107 help="Initialization code for MPI")
108 help="Initialization code for MPI")
108
109
109 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
110 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
110 config=True)
111 config=True)
111
112
112
113
113 #-----------------------------------------------------------------------------
114 #-----------------------------------------------------------------------------
114 # Main application
115 # Main application
115 #-----------------------------------------------------------------------------
116 #-----------------------------------------------------------------------------
116 aliases = dict(
117 aliases = dict(
117 file = 'IPEngineApp.url_file',
118 file = 'IPEngineApp.url_file',
118 c = 'IPEngineApp.startup_command',
119 c = 'IPEngineApp.startup_command',
119 s = 'IPEngineApp.startup_script',
120 s = 'IPEngineApp.startup_script',
120
121
121 url = 'EngineFactory.url',
122 url = 'EngineFactory.url',
122 ssh = 'EngineFactory.sshserver',
123 ssh = 'EngineFactory.sshserver',
123 sshkey = 'EngineFactory.sshkey',
124 sshkey = 'EngineFactory.sshkey',
124 ip = 'EngineFactory.ip',
125 ip = 'EngineFactory.ip',
125 transport = 'EngineFactory.transport',
126 transport = 'EngineFactory.transport',
126 port = 'EngineFactory.regport',
127 port = 'EngineFactory.regport',
127 location = 'EngineFactory.location',
128 location = 'EngineFactory.location',
128
129
129 timeout = 'EngineFactory.timeout',
130 timeout = 'EngineFactory.timeout',
130
131
131 mpi = 'MPI.use',
132 mpi = 'MPI.use',
132
133
133 )
134 )
134 aliases.update(base_aliases)
135 aliases.update(base_aliases)
135 aliases.update(session_aliases)
136 aliases.update(session_aliases)
136 flags = {}
137 flags = {}
137 flags.update(base_flags)
138 flags.update(base_flags)
138 flags.update(session_flags)
139 flags.update(session_flags)
139
140
140 class IPEngineApp(BaseParallelApplication):
141 class IPEngineApp(BaseParallelApplication):
141
142
142 name = 'ipengine'
143 name = 'ipengine'
143 description = _description
144 description = _description
144 examples = _examples
145 examples = _examples
145 config_file_name = Unicode(default_config_file_name)
146 config_file_name = Unicode(default_config_file_name)
146 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
147 classes = List([ZMQInteractiveShell, ProfileDir, Session, EngineFactory, Kernel, MPI])
147
148
148 startup_script = Unicode(u'', config=True,
149 startup_script = Unicode(u'', config=True,
149 help='specify a script to be run at startup')
150 help='specify a script to be run at startup')
150 startup_command = Unicode('', config=True,
151 startup_command = Unicode('', config=True,
151 help='specify a command to be run at startup')
152 help='specify a command to be run at startup')
152
153
153 url_file = Unicode(u'', config=True,
154 url_file = Unicode(u'', config=True,
154 help="""The full location of the file containing the connection information for
155 help="""The full location of the file containing the connection information for
155 the controller. If this is not given, the file must be in the
156 the controller. If this is not given, the file must be in the
156 security directory of the cluster directory. This location is
157 security directory of the cluster directory. This location is
157 resolved using the `profile` or `profile_dir` options.""",
158 resolved using the `profile` or `profile_dir` options.""",
158 )
159 )
159 wait_for_url_file = Float(5, config=True,
160 wait_for_url_file = Float(5, config=True,
160 help="""The maximum number of seconds to wait for url_file to exist.
161 help="""The maximum number of seconds to wait for url_file to exist.
161 This is useful for batch-systems and shared-filesystems where the
162 This is useful for batch-systems and shared-filesystems where the
162 controller and engine are started at the same time and it
163 controller and engine are started at the same time and it
163 may take a moment for the controller to write the connector files.""")
164 may take a moment for the controller to write the connector files.""")
164
165
165 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
166 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
166
167
167 def _cluster_id_changed(self, name, old, new):
168 def _cluster_id_changed(self, name, old, new):
168 if new:
169 if new:
169 base = 'ipcontroller-%s' % new
170 base = 'ipcontroller-%s' % new
170 else:
171 else:
171 base = 'ipcontroller'
172 base = 'ipcontroller'
172 self.url_file_name = "%s-engine.json" % base
173 self.url_file_name = "%s-engine.json" % base
173
174
174 log_url = Unicode('', config=True,
175 log_url = Unicode('', config=True,
175 help="""The URL for the iploggerapp instance, for forwarding
176 help="""The URL for the iploggerapp instance, for forwarding
176 logging to a central location.""")
177 logging to a central location.""")
177
178
178 # an IPKernelApp instance, used to setup listening for shell frontends
179 # an IPKernelApp instance, used to setup listening for shell frontends
179 kernel_app = Instance(IPKernelApp)
180 kernel_app = Instance(IPKernelApp)
180
181
181 aliases = Dict(aliases)
182 aliases = Dict(aliases)
182 flags = Dict(flags)
183 flags = Dict(flags)
183
184
184 @property
185 @property
185 def kernel(self):
186 def kernel(self):
186 """allow access to the Kernel object, so I look like IPKernelApp"""
187 """allow access to the Kernel object, so I look like IPKernelApp"""
187 return self.engine.kernel
188 return self.engine.kernel
188
189
189 def find_url_file(self):
190 def find_url_file(self):
190 """Set the url file.
191 """Set the url file.
191
192
192 Here we don't try to actually see if it exists for is valid as that
193 Here we don't try to actually see if it exists for is valid as that
193 is hadled by the connection logic.
194 is hadled by the connection logic.
194 """
195 """
195 config = self.config
196 config = self.config
196 # Find the actual controller key file
197 # Find the actual controller key file
197 if not self.url_file:
198 if not self.url_file:
198 self.url_file = os.path.join(
199 self.url_file = os.path.join(
199 self.profile_dir.security_dir,
200 self.profile_dir.security_dir,
200 self.url_file_name
201 self.url_file_name
201 )
202 )
202
203
203 def load_connector_file(self):
204 def load_connector_file(self):
204 """load config from a JSON connector file,
205 """load config from a JSON connector file,
205 at a *lower* priority than command-line/config files.
206 at a *lower* priority than command-line/config files.
206 """
207 """
207
208
208 self.log.info("Loading url_file %r", self.url_file)
209 self.log.info("Loading url_file %r", self.url_file)
209 config = self.config
210 config = self.config
210
211
211 with open(self.url_file) as f:
212 with open(self.url_file) as f:
212 d = json.loads(f.read())
213 d = json.loads(f.read())
213
214
214 if 'exec_key' in d:
215 if 'exec_key' in d:
215 config.Session.key = cast_bytes(d['exec_key'])
216 config.Session.key = cast_bytes(d['exec_key'])
216
217
217 try:
218 try:
218 config.EngineFactory.location
219 config.EngineFactory.location
219 except AttributeError:
220 except AttributeError:
220 config.EngineFactory.location = d['location']
221 config.EngineFactory.location = d['location']
221
222
222 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
223 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
223 try:
224 try:
224 config.EngineFactory.url
225 config.EngineFactory.url
225 except AttributeError:
226 except AttributeError:
226 config.EngineFactory.url = d['url']
227 config.EngineFactory.url = d['url']
227
228
228 try:
229 try:
229 config.EngineFactory.sshserver
230 config.EngineFactory.sshserver
230 except AttributeError:
231 except AttributeError:
231 config.EngineFactory.sshserver = d['ssh']
232 config.EngineFactory.sshserver = d['ssh']
232
233
233 def bind_kernel(self, **kwargs):
234 def bind_kernel(self, **kwargs):
234 """Promote engine to listening kernel, accessible to frontends."""
235 """Promote engine to listening kernel, accessible to frontends."""
235 if self.kernel_app is not None:
236 if self.kernel_app is not None:
236 return
237 return
237
238
238 self.log.info("Opening ports for direct connections as an IPython kernel")
239 self.log.info("Opening ports for direct connections as an IPython kernel")
239
240
240 kernel = self.kernel
241 kernel = self.kernel
241
242
242 kwargs.setdefault('config', self.config)
243 kwargs.setdefault('config', self.config)
243 kwargs.setdefault('log', self.log)
244 kwargs.setdefault('log', self.log)
244 kwargs.setdefault('profile_dir', self.profile_dir)
245 kwargs.setdefault('profile_dir', self.profile_dir)
245 kwargs.setdefault('session', self.engine.session)
246 kwargs.setdefault('session', self.engine.session)
246
247
247 app = self.kernel_app = IPKernelApp(**kwargs)
248 app = self.kernel_app = IPKernelApp(**kwargs)
248
249
249 # allow IPKernelApp.instance():
250 # allow IPKernelApp.instance():
250 IPKernelApp._instance = app
251 IPKernelApp._instance = app
251
252
252 app.init_connection_file()
253 app.init_connection_file()
253 # relevant contents of init_sockets:
254 # relevant contents of init_sockets:
254
255
255 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
256 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
256 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
257 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
257
258
258 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
259 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
259 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
260 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
260
261
261 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
262 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
262 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
263 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
263 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
264 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
264
265
265 # start the heartbeat, and log connection info:
266 # start the heartbeat, and log connection info:
266
267
267 app.init_heartbeat()
268 app.init_heartbeat()
268
269
269 app.log_connection_info()
270 app.log_connection_info()
270 app.write_connection_file()
271 app.write_connection_file()
271
272
272
273
273 def init_engine(self):
274 def init_engine(self):
274 # This is the working dir by now.
275 # This is the working dir by now.
275 sys.path.insert(0, '')
276 sys.path.insert(0, '')
276 config = self.config
277 config = self.config
277 # print config
278 # print config
278 self.find_url_file()
279 self.find_url_file()
279
280
280 # was the url manually specified?
281 # was the url manually specified?
281 keys = set(self.config.EngineFactory.keys())
282 keys = set(self.config.EngineFactory.keys())
282 keys = keys.union(set(self.config.RegistrationFactory.keys()))
283 keys = keys.union(set(self.config.RegistrationFactory.keys()))
283
284
284 if keys.intersection(set(['ip', 'url', 'port'])):
285 if keys.intersection(set(['ip', 'url', 'port'])):
285 # Connection info was specified, don't wait for the file
286 # Connection info was specified, don't wait for the file
286 url_specified = True
287 url_specified = True
287 self.wait_for_url_file = 0
288 self.wait_for_url_file = 0
288 else:
289 else:
289 url_specified = False
290 url_specified = False
290
291
291 if self.wait_for_url_file and not os.path.exists(self.url_file):
292 if self.wait_for_url_file and not os.path.exists(self.url_file):
292 self.log.warn("url_file %r not found", self.url_file)
293 self.log.warn("url_file %r not found", self.url_file)
293 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
294 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
294 tic = time.time()
295 tic = time.time()
295 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
296 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
296 # wait for url_file to exist, or until time limit
297 # wait for url_file to exist, or until time limit
297 time.sleep(0.1)
298 time.sleep(0.1)
298
299
299 if os.path.exists(self.url_file):
300 if os.path.exists(self.url_file):
300 self.load_connector_file()
301 self.load_connector_file()
301 elif not url_specified:
302 elif not url_specified:
302 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
303 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
303 self.exit(1)
304 self.exit(1)
304
305
305
306
306 try:
307 try:
307 exec_lines = config.IPKernelApp.exec_lines
308 exec_lines = config.IPKernelApp.exec_lines
308 except AttributeError:
309 except AttributeError:
309 try:
310 try:
310 exec_lines = config.InteractiveShellApp.exec_lines
311 exec_lines = config.InteractiveShellApp.exec_lines
311 except AttributeError:
312 except AttributeError:
312 exec_lines = config.IPKernelApp.exec_lines = []
313 exec_lines = config.IPKernelApp.exec_lines = []
313 try:
314 try:
314 exec_files = config.IPKernelApp.exec_files
315 exec_files = config.IPKernelApp.exec_files
315 except AttributeError:
316 except AttributeError:
316 try:
317 try:
317 exec_files = config.InteractiveShellApp.exec_files
318 exec_files = config.InteractiveShellApp.exec_files
318 except AttributeError:
319 except AttributeError:
319 exec_files = config.IPKernelApp.exec_files = []
320 exec_files = config.IPKernelApp.exec_files = []
320
321
321 if self.startup_script:
322 if self.startup_script:
322 exec_files.append(self.startup_script)
323 exec_files.append(self.startup_script)
323 if self.startup_command:
324 if self.startup_command:
324 exec_lines.append(self.startup_command)
325 exec_lines.append(self.startup_command)
325
326
326 # Create the underlying shell class and Engine
327 # Create the underlying shell class and Engine
327 # shell_class = import_item(self.master_config.Global.shell_class)
328 # shell_class = import_item(self.master_config.Global.shell_class)
328 # print self.config
329 # print self.config
329 try:
330 try:
330 self.engine = EngineFactory(config=config, log=self.log)
331 self.engine = EngineFactory(config=config, log=self.log)
331 except:
332 except:
332 self.log.error("Couldn't start the Engine", exc_info=True)
333 self.log.error("Couldn't start the Engine", exc_info=True)
333 self.exit(1)
334 self.exit(1)
334
335
335 def forward_logging(self):
336 def forward_logging(self):
336 if self.log_url:
337 if self.log_url:
337 self.log.info("Forwarding logging to %s", self.log_url)
338 self.log.info("Forwarding logging to %s", self.log_url)
338 context = self.engine.context
339 context = self.engine.context
339 lsock = context.socket(zmq.PUB)
340 lsock = context.socket(zmq.PUB)
340 lsock.connect(self.log_url)
341 lsock.connect(self.log_url)
341 handler = EnginePUBHandler(self.engine, lsock)
342 handler = EnginePUBHandler(self.engine, lsock)
342 handler.setLevel(self.log_level)
343 handler.setLevel(self.log_level)
343 self.log.addHandler(handler)
344 self.log.addHandler(handler)
344
345
345 def init_mpi(self):
346 def init_mpi(self):
346 global mpi
347 global mpi
347 self.mpi = MPI(config=self.config)
348 self.mpi = MPI(config=self.config)
348
349
349 mpi_import_statement = self.mpi.init_script
350 mpi_import_statement = self.mpi.init_script
350 if mpi_import_statement:
351 if mpi_import_statement:
351 try:
352 try:
352 self.log.info("Initializing MPI:")
353 self.log.info("Initializing MPI:")
353 self.log.info(mpi_import_statement)
354 self.log.info(mpi_import_statement)
354 exec mpi_import_statement in globals()
355 exec mpi_import_statement in globals()
355 except:
356 except:
356 mpi = None
357 mpi = None
357 else:
358 else:
358 mpi = None
359 mpi = None
359
360
360 @catch_config_error
361 @catch_config_error
361 def initialize(self, argv=None):
362 def initialize(self, argv=None):
362 super(IPEngineApp, self).initialize(argv)
363 super(IPEngineApp, self).initialize(argv)
363 self.init_mpi()
364 self.init_mpi()
364 self.init_engine()
365 self.init_engine()
365 self.forward_logging()
366 self.forward_logging()
366
367
367 def start(self):
368 def start(self):
368 self.engine.start()
369 self.engine.start()
369 try:
370 try:
370 self.engine.loop.start()
371 self.engine.loop.start()
371 except KeyboardInterrupt:
372 except KeyboardInterrupt:
372 self.log.critical("Engine Interrupted, shutting down...\n")
373 self.log.critical("Engine Interrupted, shutting down...\n")
373
374
374
375
375 def launch_new_instance():
376 def launch_new_instance():
376 """Create and run the IPython engine"""
377 """Create and run the IPython engine"""
377 app = IPEngineApp.instance()
378 app = IPEngineApp.instance()
378 app.initialize()
379 app.initialize()
379 app.start()
380 app.start()
380
381
381
382
382 if __name__ == '__main__':
383 if __name__ == '__main__':
383 launch_new_instance()
384 launch_new_instance()
384
385
General Comments 0
You need to be logged in to leave comments. Login now