##// END OF EJS Templates
Backport PR #2214: use KernelApp.exec_lines/files in IPEngineApp...
MinRK -
Show More
@@ -1,377 +1,384 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27 import time
27 import time
28
28
29 import zmq
29 import zmq
30 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
31
31
32 from IPython.core.profiledir import ProfileDir
32 from IPython.core.profiledir import ProfileDir
33 from IPython.parallel.apps.baseapp import (
33 from IPython.parallel.apps.baseapp import (
34 BaseParallelApplication,
34 BaseParallelApplication,
35 base_aliases,
35 base_aliases,
36 base_flags,
36 base_flags,
37 catch_config_error,
37 catch_config_error,
38 )
38 )
39 from IPython.zmq.log import EnginePUBHandler
39 from IPython.zmq.log import EnginePUBHandler
40 from IPython.zmq.ipkernel import Kernel, IPKernelApp
40 from IPython.zmq.ipkernel import Kernel, IPKernelApp
41 from IPython.zmq.session import (
41 from IPython.zmq.session import (
42 Session, session_aliases, session_flags
42 Session, session_aliases, session_flags
43 )
43 )
44
44
45 from IPython.config.configurable import Configurable
45 from IPython.config.configurable import Configurable
46
46
47 from IPython.parallel.engine.engine import EngineFactory
47 from IPython.parallel.engine.engine import EngineFactory
48 from IPython.parallel.util import disambiguate_url
48 from IPython.parallel.util import disambiguate_url
49
49
50 from IPython.utils.importstring import import_item
50 from IPython.utils.importstring import import_item
51 from IPython.utils.py3compat import cast_bytes
51 from IPython.utils.py3compat import cast_bytes
52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
53
53
54
54
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56 # Module level variables
56 # Module level variables
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58
58
59 #: The default config file name for this application
59 #: The default config file name for this application
60 default_config_file_name = u'ipengine_config.py'
60 default_config_file_name = u'ipengine_config.py'
61
61
62 _description = """Start an IPython engine for parallel computing.
62 _description = """Start an IPython engine for parallel computing.
63
63
64 IPython engines run in parallel and perform computations on behalf of a client
64 IPython engines run in parallel and perform computations on behalf of a client
65 and controller. A controller needs to be started before the engines. The
65 and controller. A controller needs to be started before the engines. The
66 engine can be configured using command line options or using a cluster
66 engine can be configured using command line options or using a cluster
67 directory. Cluster directories contain config, log and security files and are
67 directory. Cluster directories contain config, log and security files and are
68 usually located in your ipython directory and named as "profile_name".
68 usually located in your ipython directory and named as "profile_name".
69 See the `profile` and `profile-dir` options for details.
69 See the `profile` and `profile-dir` options for details.
70 """
70 """
71
71
72 _examples = """
72 _examples = """
73 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
73 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
74 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
74 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
75 """
75 """
76
76
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78 # MPI configuration
78 # MPI configuration
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80
80
81 mpi4py_init = """from mpi4py import MPI as mpi
81 mpi4py_init = """from mpi4py import MPI as mpi
82 mpi.size = mpi.COMM_WORLD.Get_size()
82 mpi.size = mpi.COMM_WORLD.Get_size()
83 mpi.rank = mpi.COMM_WORLD.Get_rank()
83 mpi.rank = mpi.COMM_WORLD.Get_rank()
84 """
84 """
85
85
86
86
87 pytrilinos_init = """from PyTrilinos import Epetra
87 pytrilinos_init = """from PyTrilinos import Epetra
88 class SimpleStruct:
88 class SimpleStruct:
89 pass
89 pass
90 mpi = SimpleStruct()
90 mpi = SimpleStruct()
91 mpi.rank = 0
91 mpi.rank = 0
92 mpi.size = 0
92 mpi.size = 0
93 """
93 """
94
94
95 class MPI(Configurable):
95 class MPI(Configurable):
96 """Configurable for MPI initialization"""
96 """Configurable for MPI initialization"""
97 use = Unicode('', config=True,
97 use = Unicode('', config=True,
98 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
98 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
99 )
99 )
100
100
101 def _use_changed(self, name, old, new):
101 def _use_changed(self, name, old, new):
102 # load default init script if it's not set
102 # load default init script if it's not set
103 if not self.init_script:
103 if not self.init_script:
104 self.init_script = self.default_inits.get(new, '')
104 self.init_script = self.default_inits.get(new, '')
105
105
106 init_script = Unicode('', config=True,
106 init_script = Unicode('', config=True,
107 help="Initialization code for MPI")
107 help="Initialization code for MPI")
108
108
109 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
109 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
110 config=True)
110 config=True)
111
111
112
112
113 #-----------------------------------------------------------------------------
113 #-----------------------------------------------------------------------------
114 # Main application
114 # Main application
115 #-----------------------------------------------------------------------------
115 #-----------------------------------------------------------------------------
116 aliases = dict(
116 aliases = dict(
117 file = 'IPEngineApp.url_file',
117 file = 'IPEngineApp.url_file',
118 c = 'IPEngineApp.startup_command',
118 c = 'IPEngineApp.startup_command',
119 s = 'IPEngineApp.startup_script',
119 s = 'IPEngineApp.startup_script',
120
120
121 url = 'EngineFactory.url',
121 url = 'EngineFactory.url',
122 ssh = 'EngineFactory.sshserver',
122 ssh = 'EngineFactory.sshserver',
123 sshkey = 'EngineFactory.sshkey',
123 sshkey = 'EngineFactory.sshkey',
124 ip = 'EngineFactory.ip',
124 ip = 'EngineFactory.ip',
125 transport = 'EngineFactory.transport',
125 transport = 'EngineFactory.transport',
126 port = 'EngineFactory.regport',
126 port = 'EngineFactory.regport',
127 location = 'EngineFactory.location',
127 location = 'EngineFactory.location',
128
128
129 timeout = 'EngineFactory.timeout',
129 timeout = 'EngineFactory.timeout',
130
130
131 mpi = 'MPI.use',
131 mpi = 'MPI.use',
132
132
133 )
133 )
134 aliases.update(base_aliases)
134 aliases.update(base_aliases)
135 aliases.update(session_aliases)
135 aliases.update(session_aliases)
136 flags = {}
136 flags = {}
137 flags.update(base_flags)
137 flags.update(base_flags)
138 flags.update(session_flags)
138 flags.update(session_flags)
139
139
140 class IPEngineApp(BaseParallelApplication):
140 class IPEngineApp(BaseParallelApplication):
141
141
142 name = 'ipengine'
142 name = 'ipengine'
143 description = _description
143 description = _description
144 examples = _examples
144 examples = _examples
145 config_file_name = Unicode(default_config_file_name)
145 config_file_name = Unicode(default_config_file_name)
146 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
146 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
147
147
148 startup_script = Unicode(u'', config=True,
148 startup_script = Unicode(u'', config=True,
149 help='specify a script to be run at startup')
149 help='specify a script to be run at startup')
150 startup_command = Unicode('', config=True,
150 startup_command = Unicode('', config=True,
151 help='specify a command to be run at startup')
151 help='specify a command to be run at startup')
152
152
153 url_file = Unicode(u'', config=True,
153 url_file = Unicode(u'', config=True,
154 help="""The full location of the file containing the connection information for
154 help="""The full location of the file containing the connection information for
155 the controller. If this is not given, the file must be in the
155 the controller. If this is not given, the file must be in the
156 security directory of the cluster directory. This location is
156 security directory of the cluster directory. This location is
157 resolved using the `profile` or `profile_dir` options.""",
157 resolved using the `profile` or `profile_dir` options.""",
158 )
158 )
159 wait_for_url_file = Float(5, config=True,
159 wait_for_url_file = Float(5, config=True,
160 help="""The maximum number of seconds to wait for url_file to exist.
160 help="""The maximum number of seconds to wait for url_file to exist.
161 This is useful for batch-systems and shared-filesystems where the
161 This is useful for batch-systems and shared-filesystems where the
162 controller and engine are started at the same time and it
162 controller and engine are started at the same time and it
163 may take a moment for the controller to write the connector files.""")
163 may take a moment for the controller to write the connector files.""")
164
164
165 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
165 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
166
166
167 def _cluster_id_changed(self, name, old, new):
167 def _cluster_id_changed(self, name, old, new):
168 if new:
168 if new:
169 base = 'ipcontroller-%s' % new
169 base = 'ipcontroller-%s' % new
170 else:
170 else:
171 base = 'ipcontroller'
171 base = 'ipcontroller'
172 self.url_file_name = "%s-engine.json" % base
172 self.url_file_name = "%s-engine.json" % base
173
173
174 log_url = Unicode('', config=True,
174 log_url = Unicode('', config=True,
175 help="""The URL for the iploggerapp instance, for forwarding
175 help="""The URL for the iploggerapp instance, for forwarding
176 logging to a central location.""")
176 logging to a central location.""")
177
177
178 # an IPKernelApp instance, used to setup listening for shell frontends
178 # an IPKernelApp instance, used to setup listening for shell frontends
179 kernel_app = Instance(IPKernelApp)
179 kernel_app = Instance(IPKernelApp)
180
180
181 aliases = Dict(aliases)
181 aliases = Dict(aliases)
182 flags = Dict(flags)
182 flags = Dict(flags)
183
183
184 @property
184 @property
185 def kernel(self):
185 def kernel(self):
186 """allow access to the Kernel object, so I look like IPKernelApp"""
186 """allow access to the Kernel object, so I look like IPKernelApp"""
187 return self.engine.kernel
187 return self.engine.kernel
188
188
189 def find_url_file(self):
189 def find_url_file(self):
190 """Set the url file.
190 """Set the url file.
191
191
192 Here we don't try to actually see if it exists for is valid as that
192 Here we don't try to actually see if it exists for is valid as that
193 is hadled by the connection logic.
193 is hadled by the connection logic.
194 """
194 """
195 config = self.config
195 config = self.config
196 # Find the actual controller key file
196 # Find the actual controller key file
197 if not self.url_file:
197 if not self.url_file:
198 self.url_file = os.path.join(
198 self.url_file = os.path.join(
199 self.profile_dir.security_dir,
199 self.profile_dir.security_dir,
200 self.url_file_name
200 self.url_file_name
201 )
201 )
202
202
203 def load_connector_file(self):
203 def load_connector_file(self):
204 """load config from a JSON connector file,
204 """load config from a JSON connector file,
205 at a *lower* priority than command-line/config files.
205 at a *lower* priority than command-line/config files.
206 """
206 """
207
207
208 self.log.info("Loading url_file %r", self.url_file)
208 self.log.info("Loading url_file %r", self.url_file)
209 config = self.config
209 config = self.config
210
210
211 with open(self.url_file) as f:
211 with open(self.url_file) as f:
212 d = json.loads(f.read())
212 d = json.loads(f.read())
213
213
214 if 'exec_key' in d:
214 if 'exec_key' in d:
215 config.Session.key = cast_bytes(d['exec_key'])
215 config.Session.key = cast_bytes(d['exec_key'])
216
216
217 try:
217 try:
218 config.EngineFactory.location
218 config.EngineFactory.location
219 except AttributeError:
219 except AttributeError:
220 config.EngineFactory.location = d['location']
220 config.EngineFactory.location = d['location']
221
221
222 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
222 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
223 try:
223 try:
224 config.EngineFactory.url
224 config.EngineFactory.url
225 except AttributeError:
225 except AttributeError:
226 config.EngineFactory.url = d['url']
226 config.EngineFactory.url = d['url']
227
227
228 try:
228 try:
229 config.EngineFactory.sshserver
229 config.EngineFactory.sshserver
230 except AttributeError:
230 except AttributeError:
231 config.EngineFactory.sshserver = d['ssh']
231 config.EngineFactory.sshserver = d['ssh']
232
232
233 def bind_kernel(self, **kwargs):
233 def bind_kernel(self, **kwargs):
234 """Promote engine to listening kernel, accessible to frontends."""
234 """Promote engine to listening kernel, accessible to frontends."""
235 if self.kernel_app is not None:
235 if self.kernel_app is not None:
236 return
236 return
237
237
238 self.log.info("Opening ports for direct connections as an IPython kernel")
238 self.log.info("Opening ports for direct connections as an IPython kernel")
239
239
240 kernel = self.kernel
240 kernel = self.kernel
241
241
242 kwargs.setdefault('config', self.config)
242 kwargs.setdefault('config', self.config)
243 kwargs.setdefault('log', self.log)
243 kwargs.setdefault('log', self.log)
244 kwargs.setdefault('profile_dir', self.profile_dir)
244 kwargs.setdefault('profile_dir', self.profile_dir)
245 kwargs.setdefault('session', self.engine.session)
245 kwargs.setdefault('session', self.engine.session)
246
246
247 app = self.kernel_app = IPKernelApp(**kwargs)
247 app = self.kernel_app = IPKernelApp(**kwargs)
248
248
249 # allow IPKernelApp.instance():
249 # allow IPKernelApp.instance():
250 IPKernelApp._instance = app
250 IPKernelApp._instance = app
251
251
252 app.init_connection_file()
252 app.init_connection_file()
253 # relevant contents of init_sockets:
253 # relevant contents of init_sockets:
254
254
255 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
255 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
256 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
256 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
257
257
258 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
258 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
259 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
259 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
260
260
261 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
261 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
262 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
262 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
263 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
263 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
264
264
265 # start the heartbeat, and log connection info:
265 # start the heartbeat, and log connection info:
266
266
267 app.init_heartbeat()
267 app.init_heartbeat()
268
268
269 app.log_connection_info()
269 app.log_connection_info()
270 app.write_connection_file()
270 app.write_connection_file()
271
271
272
272
273 def init_engine(self):
273 def init_engine(self):
274 # This is the working dir by now.
274 # This is the working dir by now.
275 sys.path.insert(0, '')
275 sys.path.insert(0, '')
276 config = self.config
276 config = self.config
277 # print config
277 # print config
278 self.find_url_file()
278 self.find_url_file()
279
279
280 # was the url manually specified?
280 # was the url manually specified?
281 keys = set(self.config.EngineFactory.keys())
281 keys = set(self.config.EngineFactory.keys())
282 keys = keys.union(set(self.config.RegistrationFactory.keys()))
282 keys = keys.union(set(self.config.RegistrationFactory.keys()))
283
283
284 if keys.intersection(set(['ip', 'url', 'port'])):
284 if keys.intersection(set(['ip', 'url', 'port'])):
285 # Connection info was specified, don't wait for the file
285 # Connection info was specified, don't wait for the file
286 url_specified = True
286 url_specified = True
287 self.wait_for_url_file = 0
287 self.wait_for_url_file = 0
288 else:
288 else:
289 url_specified = False
289 url_specified = False
290
290
291 if self.wait_for_url_file and not os.path.exists(self.url_file):
291 if self.wait_for_url_file and not os.path.exists(self.url_file):
292 self.log.warn("url_file %r not found", self.url_file)
292 self.log.warn("url_file %r not found", self.url_file)
293 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
293 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
294 tic = time.time()
294 tic = time.time()
295 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
295 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
296 # wait for url_file to exist, or until time limit
296 # wait for url_file to exist, or until time limit
297 time.sleep(0.1)
297 time.sleep(0.1)
298
298
299 if os.path.exists(self.url_file):
299 if os.path.exists(self.url_file):
300 self.load_connector_file()
300 self.load_connector_file()
301 elif not url_specified:
301 elif not url_specified:
302 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
302 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
303 self.exit(1)
303 self.exit(1)
304
304
305
305
306 try:
306 try:
307 exec_lines = config.Kernel.exec_lines
307 exec_lines = config.IPKernelApp.exec_lines
308 except AttributeError:
308 except AttributeError:
309 config.Kernel.exec_lines = []
309 try:
310 exec_lines = config.Kernel.exec_lines
310 exec_lines = config.InteractiveShellApp.exec_lines
311 except AttributeError:
312 exec_lines = config.IPKernelApp.exec_lines = []
313 try:
314 exec_files = config.IPKernelApp.exec_files
315 except AttributeError:
316 try:
317 exec_files = config.InteractiveShellApp.exec_files
318 except AttributeError:
319 exec_files = config.IPKernelApp.exec_files = []
311
320
312 if self.startup_script:
321 if self.startup_script:
313 enc = sys.getfilesystemencoding() or 'utf8'
322 exec_files.append(self.startup_script)
314 cmd="execfile(%r)" % self.startup_script.encode(enc)
315 exec_lines.append(cmd)
316 if self.startup_command:
323 if self.startup_command:
317 exec_lines.append(self.startup_command)
324 exec_lines.append(self.startup_command)
318
325
319 # Create the underlying shell class and Engine
326 # Create the underlying shell class and Engine
320 # shell_class = import_item(self.master_config.Global.shell_class)
327 # shell_class = import_item(self.master_config.Global.shell_class)
321 # print self.config
328 # print self.config
322 try:
329 try:
323 self.engine = EngineFactory(config=config, log=self.log)
330 self.engine = EngineFactory(config=config, log=self.log)
324 except:
331 except:
325 self.log.error("Couldn't start the Engine", exc_info=True)
332 self.log.error("Couldn't start the Engine", exc_info=True)
326 self.exit(1)
333 self.exit(1)
327
334
328 def forward_logging(self):
335 def forward_logging(self):
329 if self.log_url:
336 if self.log_url:
330 self.log.info("Forwarding logging to %s", self.log_url)
337 self.log.info("Forwarding logging to %s", self.log_url)
331 context = self.engine.context
338 context = self.engine.context
332 lsock = context.socket(zmq.PUB)
339 lsock = context.socket(zmq.PUB)
333 lsock.connect(self.log_url)
340 lsock.connect(self.log_url)
334 handler = EnginePUBHandler(self.engine, lsock)
341 handler = EnginePUBHandler(self.engine, lsock)
335 handler.setLevel(self.log_level)
342 handler.setLevel(self.log_level)
336 self.log.addHandler(handler)
343 self.log.addHandler(handler)
337
344
338 def init_mpi(self):
345 def init_mpi(self):
339 global mpi
346 global mpi
340 self.mpi = MPI(config=self.config)
347 self.mpi = MPI(config=self.config)
341
348
342 mpi_import_statement = self.mpi.init_script
349 mpi_import_statement = self.mpi.init_script
343 if mpi_import_statement:
350 if mpi_import_statement:
344 try:
351 try:
345 self.log.info("Initializing MPI:")
352 self.log.info("Initializing MPI:")
346 self.log.info(mpi_import_statement)
353 self.log.info(mpi_import_statement)
347 exec mpi_import_statement in globals()
354 exec mpi_import_statement in globals()
348 except:
355 except:
349 mpi = None
356 mpi = None
350 else:
357 else:
351 mpi = None
358 mpi = None
352
359
353 @catch_config_error
360 @catch_config_error
354 def initialize(self, argv=None):
361 def initialize(self, argv=None):
355 super(IPEngineApp, self).initialize(argv)
362 super(IPEngineApp, self).initialize(argv)
356 self.init_mpi()
363 self.init_mpi()
357 self.init_engine()
364 self.init_engine()
358 self.forward_logging()
365 self.forward_logging()
359
366
360 def start(self):
367 def start(self):
361 self.engine.start()
368 self.engine.start()
362 try:
369 try:
363 self.engine.loop.start()
370 self.engine.loop.start()
364 except KeyboardInterrupt:
371 except KeyboardInterrupt:
365 self.log.critical("Engine Interrupted, shutting down...\n")
372 self.log.critical("Engine Interrupted, shutting down...\n")
366
373
367
374
368 def launch_new_instance():
375 def launch_new_instance():
369 """Create and run the IPython engine"""
376 """Create and run the IPython engine"""
370 app = IPEngineApp.instance()
377 app = IPEngineApp.instance()
371 app.initialize()
378 app.initialize()
372 app.start()
379 app.start()
373
380
374
381
375 if __name__ == '__main__':
382 if __name__ == '__main__':
376 launch_new_instance()
383 launch_new_instance()
377
384
@@ -1,237 +1,242 b''
1 """A simple engine that talks to a controller over 0MQ.
1 """A simple engine that talks to a controller over 0MQ.
2 it handles registration, etc. and launches a kernel
2 it handles registration, etc. and launches a kernel
3 connected to the Controller's Schedulers.
3 connected to the Controller's Schedulers.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from getpass import getpass
20 from getpass import getpass
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.external.ssh import tunnel
25 from IPython.external.ssh import tunnel
26 # internal
26 # internal
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
29 )
29 )
30 from IPython.utils.py3compat import cast_bytes
30 from IPython.utils.py3compat import cast_bytes
31
31
32 from IPython.parallel.controller.heartmonitor import Heart
32 from IPython.parallel.controller.heartmonitor import Heart
33 from IPython.parallel.factory import RegistrationFactory
33 from IPython.parallel.factory import RegistrationFactory
34 from IPython.parallel.util import disambiguate_url
34 from IPython.parallel.util import disambiguate_url
35
35
36 from IPython.zmq.session import Message
36 from IPython.zmq.session import Message
37 from IPython.zmq.ipkernel import Kernel
37 from IPython.zmq.ipkernel import Kernel, IPKernelApp
38
38
39 class EngineFactory(RegistrationFactory):
39 class EngineFactory(RegistrationFactory):
40 """IPython engine"""
40 """IPython engine"""
41
41
42 # configurables:
42 # configurables:
43 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
43 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
44 help="""The OutStream for handling stdout/err.
44 help="""The OutStream for handling stdout/err.
45 Typically 'IPython.zmq.iostream.OutStream'""")
45 Typically 'IPython.zmq.iostream.OutStream'""")
46 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
46 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
47 help="""The class for handling displayhook.
47 help="""The class for handling displayhook.
48 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
48 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
49 location=Unicode(config=True,
49 location=Unicode(config=True,
50 help="""The location (an IP address) of the controller. This is
50 help="""The location (an IP address) of the controller. This is
51 used for disambiguating URLs, to determine whether
51 used for disambiguating URLs, to determine whether
52 loopback should be used to connect or the public address.""")
52 loopback should be used to connect or the public address.""")
53 timeout=CFloat(2,config=True,
53 timeout=CFloat(2,config=True,
54 help="""The time (in seconds) to wait for the Controller to respond
54 help="""The time (in seconds) to wait for the Controller to respond
55 to registration requests before giving up.""")
55 to registration requests before giving up.""")
56 sshserver=Unicode(config=True,
56 sshserver=Unicode(config=True,
57 help="""The SSH server to use for tunneling connections to the Controller.""")
57 help="""The SSH server to use for tunneling connections to the Controller.""")
58 sshkey=Unicode(config=True,
58 sshkey=Unicode(config=True,
59 help="""The SSH private key file to use when tunneling connections to the Controller.""")
59 help="""The SSH private key file to use when tunneling connections to the Controller.""")
60 paramiko=Bool(sys.platform == 'win32', config=True,
60 paramiko=Bool(sys.platform == 'win32', config=True,
61 help="""Whether to use paramiko instead of openssh for tunnels.""")
61 help="""Whether to use paramiko instead of openssh for tunnels.""")
62
62
63 # not configurable:
63 # not configurable:
64 user_ns=Dict()
64 user_ns=Dict()
65 id=Integer(allow_none=True)
65 id=Integer(allow_none=True)
66 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
66 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 kernel=Instance(Kernel)
67 kernel=Instance(Kernel)
68
68
69 bident = CBytes()
69 bident = CBytes()
70 ident = Unicode()
70 ident = Unicode()
71 def _ident_changed(self, name, old, new):
71 def _ident_changed(self, name, old, new):
72 self.bident = cast_bytes(new)
72 self.bident = cast_bytes(new)
73 using_ssh=Bool(False)
73 using_ssh=Bool(False)
74
74
75
75
76 def __init__(self, **kwargs):
76 def __init__(self, **kwargs):
77 super(EngineFactory, self).__init__(**kwargs)
77 super(EngineFactory, self).__init__(**kwargs)
78 self.ident = self.session.session
78 self.ident = self.session.session
79
79
80 def init_connector(self):
80 def init_connector(self):
81 """construct connection function, which handles tunnels."""
81 """construct connection function, which handles tunnels."""
82 self.using_ssh = bool(self.sshkey or self.sshserver)
82 self.using_ssh = bool(self.sshkey or self.sshserver)
83
83
84 if self.sshkey and not self.sshserver:
84 if self.sshkey and not self.sshserver:
85 # We are using ssh directly to the controller, tunneling localhost to localhost
85 # We are using ssh directly to the controller, tunneling localhost to localhost
86 self.sshserver = self.url.split('://')[1].split(':')[0]
86 self.sshserver = self.url.split('://')[1].split(':')[0]
87
87
88 if self.using_ssh:
88 if self.using_ssh:
89 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
89 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
90 password=False
90 password=False
91 else:
91 else:
92 password = getpass("SSH Password for %s: "%self.sshserver)
92 password = getpass("SSH Password for %s: "%self.sshserver)
93 else:
93 else:
94 password = False
94 password = False
95
95
96 def connect(s, url):
96 def connect(s, url):
97 url = disambiguate_url(url, self.location)
97 url = disambiguate_url(url, self.location)
98 if self.using_ssh:
98 if self.using_ssh:
99 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
99 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
100 return tunnel.tunnel_connection(s, url, self.sshserver,
100 return tunnel.tunnel_connection(s, url, self.sshserver,
101 keyfile=self.sshkey, paramiko=self.paramiko,
101 keyfile=self.sshkey, paramiko=self.paramiko,
102 password=password,
102 password=password,
103 )
103 )
104 else:
104 else:
105 return s.connect(url)
105 return s.connect(url)
106
106
107 def maybe_tunnel(url):
107 def maybe_tunnel(url):
108 """like connect, but don't complete the connection (for use by heartbeat)"""
108 """like connect, but don't complete the connection (for use by heartbeat)"""
109 url = disambiguate_url(url, self.location)
109 url = disambiguate_url(url, self.location)
110 if self.using_ssh:
110 if self.using_ssh:
111 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
111 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
112 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
112 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
113 keyfile=self.sshkey, paramiko=self.paramiko,
113 keyfile=self.sshkey, paramiko=self.paramiko,
114 password=password,
114 password=password,
115 )
115 )
116 return url
116 return url
117 return connect, maybe_tunnel
117 return connect, maybe_tunnel
118
118
119 def register(self):
119 def register(self):
120 """send the registration_request"""
120 """send the registration_request"""
121
121
122 self.log.info("Registering with controller at %s"%self.url)
122 self.log.info("Registering with controller at %s"%self.url)
123 ctx = self.context
123 ctx = self.context
124 connect,maybe_tunnel = self.init_connector()
124 connect,maybe_tunnel = self.init_connector()
125 reg = ctx.socket(zmq.DEALER)
125 reg = ctx.socket(zmq.DEALER)
126 reg.setsockopt(zmq.IDENTITY, self.bident)
126 reg.setsockopt(zmq.IDENTITY, self.bident)
127 connect(reg, self.url)
127 connect(reg, self.url)
128 self.registrar = zmqstream.ZMQStream(reg, self.loop)
128 self.registrar = zmqstream.ZMQStream(reg, self.loop)
129
129
130
130
131 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
131 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
132 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 # print (self.session.key)
133 # print (self.session.key)
134 self.session.send(self.registrar, "registration_request",content=content)
134 self.session.send(self.registrar, "registration_request",content=content)
135
135
136 def complete_registration(self, msg, connect, maybe_tunnel):
136 def complete_registration(self, msg, connect, maybe_tunnel):
137 # print msg
137 # print msg
138 self._abort_dc.stop()
138 self._abort_dc.stop()
139 ctx = self.context
139 ctx = self.context
140 loop = self.loop
140 loop = self.loop
141 identity = self.bident
141 identity = self.bident
142 idents,msg = self.session.feed_identities(msg)
142 idents,msg = self.session.feed_identities(msg)
143 msg = Message(self.session.unserialize(msg))
143 msg = Message(self.session.unserialize(msg))
144
144
145 if msg.content.status == 'ok':
145 if msg.content.status == 'ok':
146 self.id = int(msg.content.id)
146 self.id = int(msg.content.id)
147
147
148 # launch heartbeat
148 # launch heartbeat
149 hb_addrs = msg.content.heartbeat
149 hb_addrs = msg.content.heartbeat
150
150
151 # possibly forward hb ports with tunnels
151 # possibly forward hb ports with tunnels
152 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
152 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
153 heart = Heart(*map(str, hb_addrs), heart_id=identity)
153 heart = Heart(*map(str, hb_addrs), heart_id=identity)
154 heart.start()
154 heart.start()
155
155
156 # create Shell Streams (MUX, Task, etc.):
156 # create Shell Streams (MUX, Task, etc.):
157 queue_addr = msg.content.mux
157 queue_addr = msg.content.mux
158 shell_addrs = [ str(queue_addr) ]
158 shell_addrs = [ str(queue_addr) ]
159 task_addr = msg.content.task
159 task_addr = msg.content.task
160 if task_addr:
160 if task_addr:
161 shell_addrs.append(str(task_addr))
161 shell_addrs.append(str(task_addr))
162
162
163 # Uncomment this to go back to two-socket model
163 # Uncomment this to go back to two-socket model
164 # shell_streams = []
164 # shell_streams = []
165 # for addr in shell_addrs:
165 # for addr in shell_addrs:
166 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
166 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
167 # stream.setsockopt(zmq.IDENTITY, identity)
167 # stream.setsockopt(zmq.IDENTITY, identity)
168 # stream.connect(disambiguate_url(addr, self.location))
168 # stream.connect(disambiguate_url(addr, self.location))
169 # shell_streams.append(stream)
169 # shell_streams.append(stream)
170
170
171 # Now use only one shell stream for mux and tasks
171 # Now use only one shell stream for mux and tasks
172 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
172 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
173 stream.setsockopt(zmq.IDENTITY, identity)
173 stream.setsockopt(zmq.IDENTITY, identity)
174 shell_streams = [stream]
174 shell_streams = [stream]
175 for addr in shell_addrs:
175 for addr in shell_addrs:
176 connect(stream, addr)
176 connect(stream, addr)
177 # end single stream-socket
177 # end single stream-socket
178
178
179 # control stream:
179 # control stream:
180 control_addr = str(msg.content.control)
180 control_addr = str(msg.content.control)
181 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
181 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
182 control_stream.setsockopt(zmq.IDENTITY, identity)
182 control_stream.setsockopt(zmq.IDENTITY, identity)
183 connect(control_stream, control_addr)
183 connect(control_stream, control_addr)
184
184
185 # create iopub stream:
185 # create iopub stream:
186 iopub_addr = msg.content.iopub
186 iopub_addr = msg.content.iopub
187 iopub_socket = ctx.socket(zmq.PUB)
187 iopub_socket = ctx.socket(zmq.PUB)
188 iopub_socket.setsockopt(zmq.IDENTITY, identity)
188 iopub_socket.setsockopt(zmq.IDENTITY, identity)
189 connect(iopub_socket, iopub_addr)
189 connect(iopub_socket, iopub_addr)
190
190
191 # disable history:
191 # disable history:
192 self.config.HistoryManager.hist_file = ':memory:'
192 self.config.HistoryManager.hist_file = ':memory:'
193
193
194 # Redirect input streams and set a display hook.
194 # Redirect input streams and set a display hook.
195 if self.out_stream_factory:
195 if self.out_stream_factory:
196 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
196 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
197 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
197 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
198 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
198 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
199 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
199 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
200 if self.display_hook_factory:
200 if self.display_hook_factory:
201 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
201 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
202 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
202 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
203
203
204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
205 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
205 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
206 loop=loop, user_ns=self.user_ns, log=self.log)
206 loop=loop, user_ns=self.user_ns, log=self.log)
207
207 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
208 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
208 self.kernel.start()
209
209
210 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
211 app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
212 app.init_profile_dir()
213 app.init_code()
210
214
215 self.kernel.start()
211 else:
216 else:
212 self.log.fatal("Registration Failed: %s"%msg)
217 self.log.fatal("Registration Failed: %s"%msg)
213 raise Exception("Registration Failed: %s"%msg)
218 raise Exception("Registration Failed: %s"%msg)
214
219
215 self.log.info("Completed registration with id %i"%self.id)
220 self.log.info("Completed registration with id %i"%self.id)
216
221
217
222
218 def abort(self):
223 def abort(self):
219 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
224 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
220 if self.url.startswith('127.'):
225 if self.url.startswith('127.'):
221 self.log.fatal("""
226 self.log.fatal("""
222 If the controller and engines are not on the same machine,
227 If the controller and engines are not on the same machine,
223 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
228 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
224 c.HubFactory.ip='*' # for all interfaces, internal and external
229 c.HubFactory.ip='*' # for all interfaces, internal and external
225 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
230 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
226 or tunnel connections via ssh.
231 or tunnel connections via ssh.
227 """)
232 """)
228 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
233 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
229 time.sleep(1)
234 time.sleep(1)
230 sys.exit(255)
235 sys.exit(255)
231
236
232 def start(self):
237 def start(self):
233 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
238 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
234 dc.start()
239 dc.start()
235 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
240 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
236 self._abort_dc.start()
241 self._abort_dc.start()
237
242
General Comments 0
You need to be logged in to leave comments. Login now