##// END OF EJS Templates
add listen_kernel method to IPEngineApp
MinRK -
Show More
@@ -1,334 +1,371 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27 import time
27 import time
28
28
29 import zmq
29 import zmq
30 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
31
31
32 from IPython.core.profiledir import ProfileDir
32 from IPython.core.profiledir import ProfileDir
33 from IPython.parallel.apps.baseapp import (
33 from IPython.parallel.apps.baseapp import (
34 BaseParallelApplication,
34 BaseParallelApplication,
35 base_aliases,
35 base_aliases,
36 base_flags,
36 base_flags,
37 catch_config_error,
37 catch_config_error,
38 )
38 )
39 from IPython.zmq.log import EnginePUBHandler
39 from IPython.zmq.log import EnginePUBHandler
40 from IPython.zmq.ipkernel import Kernel
40 from IPython.zmq.ipkernel import Kernel, IPKernelApp
41 from IPython.zmq.session import (
41 from IPython.zmq.session import (
42 Session, session_aliases, session_flags
42 Session, session_aliases, session_flags
43 )
43 )
44
44
45 from IPython.config.configurable import Configurable
45 from IPython.config.configurable import Configurable
46
46
47 from IPython.parallel.engine.engine import EngineFactory
47 from IPython.parallel.engine.engine import EngineFactory
48 from IPython.parallel.util import disambiguate_url
48 from IPython.parallel.util import disambiguate_url
49
49
50 from IPython.utils.importstring import import_item
50 from IPython.utils.importstring import import_item
51 from IPython.utils.py3compat import cast_bytes
51 from IPython.utils.py3compat import cast_bytes
52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
53
53
54
54
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56 # Module level variables
56 # Module level variables
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58
58
59 #: The default config file name for this application
59 #: The default config file name for this application
60 default_config_file_name = u'ipengine_config.py'
60 default_config_file_name = u'ipengine_config.py'
61
61
62 _description = """Start an IPython engine for parallel computing.
62 _description = """Start an IPython engine for parallel computing.
63
63
64 IPython engines run in parallel and perform computations on behalf of a client
64 IPython engines run in parallel and perform computations on behalf of a client
65 and controller. A controller needs to be started before the engines. The
65 and controller. A controller needs to be started before the engines. The
66 engine can be configured using command line options or using a cluster
66 engine can be configured using command line options or using a cluster
67 directory. Cluster directories contain config, log and security files and are
67 directory. Cluster directories contain config, log and security files and are
68 usually located in your ipython directory and named as "profile_name".
68 usually located in your ipython directory and named as "profile_name".
69 See the `profile` and `profile-dir` options for details.
69 See the `profile` and `profile-dir` options for details.
70 """
70 """
71
71
72 _examples = """
72 _examples = """
73 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
73 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
74 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
74 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
75 """
75 """
76
76
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78 # MPI configuration
78 # MPI configuration
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80
80
81 mpi4py_init = """from mpi4py import MPI as mpi
81 mpi4py_init = """from mpi4py import MPI as mpi
82 mpi.size = mpi.COMM_WORLD.Get_size()
82 mpi.size = mpi.COMM_WORLD.Get_size()
83 mpi.rank = mpi.COMM_WORLD.Get_rank()
83 mpi.rank = mpi.COMM_WORLD.Get_rank()
84 """
84 """
85
85
86
86
87 pytrilinos_init = """from PyTrilinos import Epetra
87 pytrilinos_init = """from PyTrilinos import Epetra
88 class SimpleStruct:
88 class SimpleStruct:
89 pass
89 pass
90 mpi = SimpleStruct()
90 mpi = SimpleStruct()
91 mpi.rank = 0
91 mpi.rank = 0
92 mpi.size = 0
92 mpi.size = 0
93 """
93 """
94
94
95 class MPI(Configurable):
95 class MPI(Configurable):
96 """Configurable for MPI initialization"""
96 """Configurable for MPI initialization"""
97 use = Unicode('', config=True,
97 use = Unicode('', config=True,
98 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
98 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
99 )
99 )
100
100
101 def _use_changed(self, name, old, new):
101 def _use_changed(self, name, old, new):
102 # load default init script if it's not set
102 # load default init script if it's not set
103 if not self.init_script:
103 if not self.init_script:
104 self.init_script = self.default_inits.get(new, '')
104 self.init_script = self.default_inits.get(new, '')
105
105
106 init_script = Unicode('', config=True,
106 init_script = Unicode('', config=True,
107 help="Initialization code for MPI")
107 help="Initialization code for MPI")
108
108
109 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
109 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
110 config=True)
110 config=True)
111
111
112
112
113 #-----------------------------------------------------------------------------
113 #-----------------------------------------------------------------------------
114 # Main application
114 # Main application
115 #-----------------------------------------------------------------------------
115 #-----------------------------------------------------------------------------
116 aliases = dict(
116 aliases = dict(
117 file = 'IPEngineApp.url_file',
117 file = 'IPEngineApp.url_file',
118 c = 'IPEngineApp.startup_command',
118 c = 'IPEngineApp.startup_command',
119 s = 'IPEngineApp.startup_script',
119 s = 'IPEngineApp.startup_script',
120
120
121 url = 'EngineFactory.url',
121 url = 'EngineFactory.url',
122 ssh = 'EngineFactory.sshserver',
122 ssh = 'EngineFactory.sshserver',
123 sshkey = 'EngineFactory.sshkey',
123 sshkey = 'EngineFactory.sshkey',
124 ip = 'EngineFactory.ip',
124 ip = 'EngineFactory.ip',
125 transport = 'EngineFactory.transport',
125 transport = 'EngineFactory.transport',
126 port = 'EngineFactory.regport',
126 port = 'EngineFactory.regport',
127 location = 'EngineFactory.location',
127 location = 'EngineFactory.location',
128
128
129 timeout = 'EngineFactory.timeout',
129 timeout = 'EngineFactory.timeout',
130
130
131 mpi = 'MPI.use',
131 mpi = 'MPI.use',
132
132
133 )
133 )
134 aliases.update(base_aliases)
134 aliases.update(base_aliases)
135 aliases.update(session_aliases)
135 aliases.update(session_aliases)
136 flags = {}
136 flags = {}
137 flags.update(base_flags)
137 flags.update(base_flags)
138 flags.update(session_flags)
138 flags.update(session_flags)
139
139
140 class IPEngineApp(BaseParallelApplication):
140 class IPEngineApp(BaseParallelApplication):
141
141
142 name = 'ipengine'
142 name = 'ipengine'
143 description = _description
143 description = _description
144 examples = _examples
144 examples = _examples
145 config_file_name = Unicode(default_config_file_name)
145 config_file_name = Unicode(default_config_file_name)
146 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
146 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
147
147
148 startup_script = Unicode(u'', config=True,
148 startup_script = Unicode(u'', config=True,
149 help='specify a script to be run at startup')
149 help='specify a script to be run at startup')
150 startup_command = Unicode('', config=True,
150 startup_command = Unicode('', config=True,
151 help='specify a command to be run at startup')
151 help='specify a command to be run at startup')
152
152
153 url_file = Unicode(u'', config=True,
153 url_file = Unicode(u'', config=True,
154 help="""The full location of the file containing the connection information for
154 help="""The full location of the file containing the connection information for
155 the controller. If this is not given, the file must be in the
155 the controller. If this is not given, the file must be in the
156 security directory of the cluster directory. This location is
156 security directory of the cluster directory. This location is
157 resolved using the `profile` or `profile_dir` options.""",
157 resolved using the `profile` or `profile_dir` options.""",
158 )
158 )
159 wait_for_url_file = Float(5, config=True,
159 wait_for_url_file = Float(5, config=True,
160 help="""The maximum number of seconds to wait for url_file to exist.
160 help="""The maximum number of seconds to wait for url_file to exist.
161 This is useful for batch-systems and shared-filesystems where the
161 This is useful for batch-systems and shared-filesystems where the
162 controller and engine are started at the same time and it
162 controller and engine are started at the same time and it
163 may take a moment for the controller to write the connector files.""")
163 may take a moment for the controller to write the connector files.""")
164
164
165 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
165 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
166
166
167 def _cluster_id_changed(self, name, old, new):
167 def _cluster_id_changed(self, name, old, new):
168 if new:
168 if new:
169 base = 'ipcontroller-%s' % new
169 base = 'ipcontroller-%s' % new
170 else:
170 else:
171 base = 'ipcontroller'
171 base = 'ipcontroller'
172 self.url_file_name = "%s-engine.json" % base
172 self.url_file_name = "%s-engine.json" % base
173
173
174 log_url = Unicode('', config=True,
174 log_url = Unicode('', config=True,
175 help="""The URL for the iploggerapp instance, for forwarding
175 help="""The URL for the iploggerapp instance, for forwarding
176 logging to a central location.""")
176 logging to a central location.""")
177
178 # an IPKernelApp instance, used to setup listening for shell frontends
179 kernel_app = Instance(IPKernelApp)
177
180
178 aliases = Dict(aliases)
181 aliases = Dict(aliases)
179 flags = Dict(flags)
182 flags = Dict(flags)
180
183
181 @property
184 @property
182 def kernel(self):
185 def kernel(self):
183 """allow access to the Kernel object, so I look like IPKernelApp"""
186 """allow access to the Kernel object, so I look like IPKernelApp"""
184 return self.engine.kernel
187 return self.engine.kernel
185
188
186 def find_url_file(self):
189 def find_url_file(self):
187 """Set the url file.
190 """Set the url file.
188
191
189 Here we don't try to actually see if it exists for is valid as that
192 Here we don't try to actually see if it exists for is valid as that
190 is hadled by the connection logic.
193 is hadled by the connection logic.
191 """
194 """
192 config = self.config
195 config = self.config
193 # Find the actual controller key file
196 # Find the actual controller key file
194 if not self.url_file:
197 if not self.url_file:
195 self.url_file = os.path.join(
198 self.url_file = os.path.join(
196 self.profile_dir.security_dir,
199 self.profile_dir.security_dir,
197 self.url_file_name
200 self.url_file_name
198 )
201 )
199
202
200 def load_connector_file(self):
203 def load_connector_file(self):
201 """load config from a JSON connector file,
204 """load config from a JSON connector file,
202 at a *lower* priority than command-line/config files.
205 at a *lower* priority than command-line/config files.
203 """
206 """
204
207
205 self.log.info("Loading url_file %r", self.url_file)
208 self.log.info("Loading url_file %r", self.url_file)
206 config = self.config
209 config = self.config
207
210
208 with open(self.url_file) as f:
211 with open(self.url_file) as f:
209 d = json.loads(f.read())
212 d = json.loads(f.read())
210
213
211 if 'exec_key' in d:
214 if 'exec_key' in d:
212 config.Session.key = cast_bytes(d['exec_key'])
215 config.Session.key = cast_bytes(d['exec_key'])
213
216
214 try:
217 try:
215 config.EngineFactory.location
218 config.EngineFactory.location
216 except AttributeError:
219 except AttributeError:
217 config.EngineFactory.location = d['location']
220 config.EngineFactory.location = d['location']
218
221
219 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
222 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
220 try:
223 try:
221 config.EngineFactory.url
224 config.EngineFactory.url
222 except AttributeError:
225 except AttributeError:
223 config.EngineFactory.url = d['url']
226 config.EngineFactory.url = d['url']
224
227
225 try:
228 try:
226 config.EngineFactory.sshserver
229 config.EngineFactory.sshserver
227 except AttributeError:
230 except AttributeError:
228 config.EngineFactory.sshserver = d['ssh']
231 config.EngineFactory.sshserver = d['ssh']
232
233 def listen_kernel(self):
234 """setup engine as listening Kernel, for frontends"""
235 if self.kernel_app is not None:
236 return
237
238 self.log.info("Opening ports for direct connections")
239
240 kernel = self.kernel
241
242 app = self.kernel_app = IPKernelApp(log=self.log, config=self.config,
243 profile_dir = self.profile_dir,
244 session=self.engine.session,
245 )
246 app.init_connection_file()
247 # relevant contents of init_sockets:
248
249 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
250 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
229
251
252 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
253 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
254
255 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
256 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
257 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
258
259 # start the heartbeat, and log connection info:
260
261 app.init_heartbeat()
262
263 app.log_connection_info()
264 app.write_connection_file()
265
266
230 def init_engine(self):
267 def init_engine(self):
231 # This is the working dir by now.
268 # This is the working dir by now.
232 sys.path.insert(0, '')
269 sys.path.insert(0, '')
233 config = self.config
270 config = self.config
234 # print config
271 # print config
235 self.find_url_file()
272 self.find_url_file()
236
273
237 # was the url manually specified?
274 # was the url manually specified?
238 keys = set(self.config.EngineFactory.keys())
275 keys = set(self.config.EngineFactory.keys())
239 keys = keys.union(set(self.config.RegistrationFactory.keys()))
276 keys = keys.union(set(self.config.RegistrationFactory.keys()))
240
277
241 if keys.intersection(set(['ip', 'url', 'port'])):
278 if keys.intersection(set(['ip', 'url', 'port'])):
242 # Connection info was specified, don't wait for the file
279 # Connection info was specified, don't wait for the file
243 url_specified = True
280 url_specified = True
244 self.wait_for_url_file = 0
281 self.wait_for_url_file = 0
245 else:
282 else:
246 url_specified = False
283 url_specified = False
247
284
248 if self.wait_for_url_file and not os.path.exists(self.url_file):
285 if self.wait_for_url_file and not os.path.exists(self.url_file):
249 self.log.warn("url_file %r not found", self.url_file)
286 self.log.warn("url_file %r not found", self.url_file)
250 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
287 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
251 tic = time.time()
288 tic = time.time()
252 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
289 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
253 # wait for url_file to exist, or until time limit
290 # wait for url_file to exist, or until time limit
254 time.sleep(0.1)
291 time.sleep(0.1)
255
292
256 if os.path.exists(self.url_file):
293 if os.path.exists(self.url_file):
257 self.load_connector_file()
294 self.load_connector_file()
258 elif not url_specified:
295 elif not url_specified:
259 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
296 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
260 self.exit(1)
297 self.exit(1)
261
298
262
299
263 try:
300 try:
264 exec_lines = config.Kernel.exec_lines
301 exec_lines = config.Kernel.exec_lines
265 except AttributeError:
302 except AttributeError:
266 config.Kernel.exec_lines = []
303 config.Kernel.exec_lines = []
267 exec_lines = config.Kernel.exec_lines
304 exec_lines = config.Kernel.exec_lines
268
305
269 if self.startup_script:
306 if self.startup_script:
270 enc = sys.getfilesystemencoding() or 'utf8'
307 enc = sys.getfilesystemencoding() or 'utf8'
271 cmd="execfile(%r)" % self.startup_script.encode(enc)
308 cmd="execfile(%r)" % self.startup_script.encode(enc)
272 exec_lines.append(cmd)
309 exec_lines.append(cmd)
273 if self.startup_command:
310 if self.startup_command:
274 exec_lines.append(self.startup_command)
311 exec_lines.append(self.startup_command)
275
312
276 # Create the underlying shell class and Engine
313 # Create the underlying shell class and Engine
277 # shell_class = import_item(self.master_config.Global.shell_class)
314 # shell_class = import_item(self.master_config.Global.shell_class)
278 # print self.config
315 # print self.config
279 try:
316 try:
280 self.engine = EngineFactory(config=config, log=self.log)
317 self.engine = EngineFactory(config=config, log=self.log)
281 except:
318 except:
282 self.log.error("Couldn't start the Engine", exc_info=True)
319 self.log.error("Couldn't start the Engine", exc_info=True)
283 self.exit(1)
320 self.exit(1)
284
321
285 def forward_logging(self):
322 def forward_logging(self):
286 if self.log_url:
323 if self.log_url:
287 self.log.info("Forwarding logging to %s", self.log_url)
324 self.log.info("Forwarding logging to %s", self.log_url)
288 context = self.engine.context
325 context = self.engine.context
289 lsock = context.socket(zmq.PUB)
326 lsock = context.socket(zmq.PUB)
290 lsock.connect(self.log_url)
327 lsock.connect(self.log_url)
291 handler = EnginePUBHandler(self.engine, lsock)
328 handler = EnginePUBHandler(self.engine, lsock)
292 handler.setLevel(self.log_level)
329 handler.setLevel(self.log_level)
293 self.log.addHandler(handler)
330 self.log.addHandler(handler)
294
331
295 def init_mpi(self):
332 def init_mpi(self):
296 global mpi
333 global mpi
297 self.mpi = MPI(config=self.config)
334 self.mpi = MPI(config=self.config)
298
335
299 mpi_import_statement = self.mpi.init_script
336 mpi_import_statement = self.mpi.init_script
300 if mpi_import_statement:
337 if mpi_import_statement:
301 try:
338 try:
302 self.log.info("Initializing MPI:")
339 self.log.info("Initializing MPI:")
303 self.log.info(mpi_import_statement)
340 self.log.info(mpi_import_statement)
304 exec mpi_import_statement in globals()
341 exec mpi_import_statement in globals()
305 except:
342 except:
306 mpi = None
343 mpi = None
307 else:
344 else:
308 mpi = None
345 mpi = None
309
346
310 @catch_config_error
347 @catch_config_error
311 def initialize(self, argv=None):
348 def initialize(self, argv=None):
312 super(IPEngineApp, self).initialize(argv)
349 super(IPEngineApp, self).initialize(argv)
313 self.init_mpi()
350 self.init_mpi()
314 self.init_engine()
351 self.init_engine()
315 self.forward_logging()
352 self.forward_logging()
316
353
317 def start(self):
354 def start(self):
318 self.engine.start()
355 self.engine.start()
319 try:
356 try:
320 self.engine.loop.start()
357 self.engine.loop.start()
321 except KeyboardInterrupt:
358 except KeyboardInterrupt:
322 self.log.critical("Engine Interrupted, shutting down...\n")
359 self.log.critical("Engine Interrupted, shutting down...\n")
323
360
324
361
325 def launch_new_instance():
362 def launch_new_instance():
326 """Create and run the IPython engine"""
363 """Create and run the IPython engine"""
327 app = IPEngineApp.instance()
364 app = IPEngineApp.instance()
328 app.initialize()
365 app.initialize()
329 app.start()
366 app.start()
330
367
331
368
332 if __name__ == '__main__':
369 if __name__ == '__main__':
333 launch_new_instance()
370 launch_new_instance()
334
371
General Comments 0
You need to be logged in to leave comments. Login now