##// END OF EJS Templates
split init_sockets into a few steps, so they can be better reused
MinRK -
Show More
@@ -1,313 +1,319 b''
1 """An Application for launching a kernel
1 """An Application for launching a kernel
2
2
3 Authors
3 Authors
4 -------
4 -------
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING.txt, distributed as part of this software.
11 # the file COPYING.txt, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports
18 # Standard library imports
19 import json
19 import json
20 import os
20 import os
21 import sys
21 import sys
22 import signal
22 import signal
23
23
24 # System library imports
24 # System library imports
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27
27
28 # IPython imports
28 # IPython imports
29 from IPython.core.ultratb import FormattedTB
29 from IPython.core.ultratb import FormattedTB
30 from IPython.core.application import (
30 from IPython.core.application import (
31 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
31 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
32 )
32 )
33 from IPython.utils import io
33 from IPython.utils import io
34 from IPython.utils.localinterfaces import LOCALHOST
34 from IPython.utils.localinterfaces import LOCALHOST
35 from IPython.utils.path import filefind
35 from IPython.utils.path import filefind
36 from IPython.utils.py3compat import str_to_bytes
36 from IPython.utils.py3compat import str_to_bytes
37 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
37 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
38 DottedObjectName)
38 DottedObjectName)
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 # local imports
40 # local imports
41 from IPython.zmq.entry_point import write_connection_file
41 from IPython.zmq.entry_point import write_connection_file
42 from IPython.zmq.heartbeat import Heartbeat
42 from IPython.zmq.heartbeat import Heartbeat
43 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
43 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
44 from IPython.zmq.session import (
44 from IPython.zmq.session import (
45 Session, session_flags, session_aliases, default_secure,
45 Session, session_flags, session_aliases, default_secure,
46 )
46 )
47
47
48
48
49 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
50 # Flags and Aliases
50 # Flags and Aliases
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52
52
53 kernel_aliases = dict(base_aliases)
53 kernel_aliases = dict(base_aliases)
54 kernel_aliases.update({
54 kernel_aliases.update({
55 'ip' : 'KernelApp.ip',
55 'ip' : 'KernelApp.ip',
56 'hb' : 'KernelApp.hb_port',
56 'hb' : 'KernelApp.hb_port',
57 'shell' : 'KernelApp.shell_port',
57 'shell' : 'KernelApp.shell_port',
58 'iopub' : 'KernelApp.iopub_port',
58 'iopub' : 'KernelApp.iopub_port',
59 'stdin' : 'KernelApp.stdin_port',
59 'stdin' : 'KernelApp.stdin_port',
60 'f' : 'KernelApp.connection_file',
60 'f' : 'KernelApp.connection_file',
61 'parent': 'KernelApp.parent',
61 'parent': 'KernelApp.parent',
62 })
62 })
63 if sys.platform.startswith('win'):
63 if sys.platform.startswith('win'):
64 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
64 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
65
65
66 kernel_flags = dict(base_flags)
66 kernel_flags = dict(base_flags)
67 kernel_flags.update({
67 kernel_flags.update({
68 'no-stdout' : (
68 'no-stdout' : (
69 {'KernelApp' : {'no_stdout' : True}},
69 {'KernelApp' : {'no_stdout' : True}},
70 "redirect stdout to the null device"),
70 "redirect stdout to the null device"),
71 'no-stderr' : (
71 'no-stderr' : (
72 {'KernelApp' : {'no_stderr' : True}},
72 {'KernelApp' : {'no_stderr' : True}},
73 "redirect stderr to the null device"),
73 "redirect stderr to the null device"),
74 })
74 })
75
75
76 # inherit flags&aliases for Sessions
76 # inherit flags&aliases for Sessions
77 kernel_aliases.update(session_aliases)
77 kernel_aliases.update(session_aliases)
78 kernel_flags.update(session_flags)
78 kernel_flags.update(session_flags)
79
79
80
80
81
81
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83 # Application class for starting a Kernel
83 # Application class for starting a Kernel
84 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
85
85
86 class KernelApp(BaseIPythonApplication):
86 class KernelApp(BaseIPythonApplication):
87 name='ipkernel'
87 name='ipkernel'
88 aliases = Dict(kernel_aliases)
88 aliases = Dict(kernel_aliases)
89 flags = Dict(kernel_flags)
89 flags = Dict(kernel_flags)
90 classes = [Session]
90 classes = [Session]
91 # the kernel class, as an importstring
91 # the kernel class, as an importstring
92 kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel')
92 kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel')
93 kernel = Any()
93 kernel = Any()
94 poller = Any() # don't restrict this even though current pollers are all Threads
94 poller = Any() # don't restrict this even though current pollers are all Threads
95 heartbeat = Instance(Heartbeat)
95 heartbeat = Instance(Heartbeat)
96 session = Instance('IPython.zmq.session.Session')
96 session = Instance('IPython.zmq.session.Session')
97 ports = Dict()
97 ports = Dict()
98
98
99 # inherit config file name from parent:
99 # inherit config file name from parent:
100 parent_appname = Unicode(config=True)
100 parent_appname = Unicode(config=True)
101 def _parent_appname_changed(self, name, old, new):
101 def _parent_appname_changed(self, name, old, new):
102 if self.config_file_specified:
102 if self.config_file_specified:
103 # it was manually specified, ignore
103 # it was manually specified, ignore
104 return
104 return
105 self.config_file_name = new.replace('-','_') + u'_config.py'
105 self.config_file_name = new.replace('-','_') + u'_config.py'
106 # don't let this count as specifying the config file
106 # don't let this count as specifying the config file
107 self.config_file_specified = False
107 self.config_file_specified = False
108
108
109 # connection info:
109 # connection info:
110 ip = Unicode(LOCALHOST, config=True,
110 ip = Unicode(LOCALHOST, config=True,
111 help="Set the IP or interface on which the kernel will listen.")
111 help="Set the IP or interface on which the kernel will listen.")
112 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
112 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
113 shell_port = Integer(0, config=True, help="set the shell (XREP) port [default: random]")
113 shell_port = Integer(0, config=True, help="set the shell (XREP) port [default: random]")
114 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
114 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
115 stdin_port = Integer(0, config=True, help="set the stdin (XREQ) port [default: random]")
115 stdin_port = Integer(0, config=True, help="set the stdin (XREQ) port [default: random]")
116 connection_file = Unicode('', config=True,
116 connection_file = Unicode('', config=True,
117 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
117 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
118
118
119 This file will contain the IP, ports, and authentication key needed to connect
119 This file will contain the IP, ports, and authentication key needed to connect
120 clients to this kernel. By default, this file will be created in the security-dir
120 clients to this kernel. By default, this file will be created in the security-dir
121 of the current profile, but can be specified by absolute path.
121 of the current profile, but can be specified by absolute path.
122 """)
122 """)
123
123
124 # streams, etc.
124 # streams, etc.
125 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
125 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
126 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
126 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
127 outstream_class = DottedObjectName('IPython.zmq.iostream.OutStream',
127 outstream_class = DottedObjectName('IPython.zmq.iostream.OutStream',
128 config=True, help="The importstring for the OutStream factory")
128 config=True, help="The importstring for the OutStream factory")
129 displayhook_class = DottedObjectName('IPython.zmq.displayhook.ZMQDisplayHook',
129 displayhook_class = DottedObjectName('IPython.zmq.displayhook.ZMQDisplayHook',
130 config=True, help="The importstring for the DisplayHook factory")
130 config=True, help="The importstring for the DisplayHook factory")
131
131
132 # polling
132 # polling
133 parent = Integer(0, config=True,
133 parent = Integer(0, config=True,
134 help="""kill this process if its parent dies. On Windows, the argument
134 help="""kill this process if its parent dies. On Windows, the argument
135 specifies the HANDLE of the parent process, otherwise it is simply boolean.
135 specifies the HANDLE of the parent process, otherwise it is simply boolean.
136 """)
136 """)
137 interrupt = Integer(0, config=True,
137 interrupt = Integer(0, config=True,
138 help="""ONLY USED ON WINDOWS
138 help="""ONLY USED ON WINDOWS
139 Interrupt this process when the parent is signalled.
139 Interrupt this process when the parent is signalled.
140 """)
140 """)
141
141
142 def init_crash_handler(self):
142 def init_crash_handler(self):
143 # Install minimal exception handling
143 # Install minimal exception handling
144 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
144 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
145 ostream=sys.__stdout__)
145 ostream=sys.__stdout__)
146
146
147 def init_poller(self):
147 def init_poller(self):
148 if sys.platform == 'win32':
148 if sys.platform == 'win32':
149 if self.interrupt or self.parent:
149 if self.interrupt or self.parent:
150 self.poller = ParentPollerWindows(self.interrupt, self.parent)
150 self.poller = ParentPollerWindows(self.interrupt, self.parent)
151 elif self.parent:
151 elif self.parent:
152 self.poller = ParentPollerUnix()
152 self.poller = ParentPollerUnix()
153
153
154 def _bind_socket(self, s, port):
154 def _bind_socket(self, s, port):
155 iface = 'tcp://%s' % self.ip
155 iface = 'tcp://%s' % self.ip
156 if port <= 0:
156 if port <= 0:
157 port = s.bind_to_random_port(iface)
157 port = s.bind_to_random_port(iface)
158 else:
158 else:
159 s.bind(iface + ':%i'%port)
159 s.bind(iface + ':%i'%port)
160 return port
160 return port
161
161
162 def load_connection_file(self):
162 def load_connection_file(self):
163 """load ip/port/hmac config from JSON connection file"""
163 """load ip/port/hmac config from JSON connection file"""
164 try:
164 try:
165 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
165 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
166 except IOError:
166 except IOError:
167 self.log.debug("Connection file not found: %s", self.connection_file)
167 self.log.debug("Connection file not found: %s", self.connection_file)
168 return
168 return
169 self.log.debug(u"Loading connection file %s", fname)
169 self.log.debug(u"Loading connection file %s", fname)
170 with open(fname) as f:
170 with open(fname) as f:
171 s = f.read()
171 s = f.read()
172 cfg = json.loads(s)
172 cfg = json.loads(s)
173 if self.ip == LOCALHOST and 'ip' in cfg:
173 if self.ip == LOCALHOST and 'ip' in cfg:
174 # not overridden by config or cl_args
174 # not overridden by config or cl_args
175 self.ip = cfg['ip']
175 self.ip = cfg['ip']
176 for channel in ('hb', 'shell', 'iopub', 'stdin'):
176 for channel in ('hb', 'shell', 'iopub', 'stdin'):
177 name = channel + '_port'
177 name = channel + '_port'
178 if getattr(self, name) == 0 and name in cfg:
178 if getattr(self, name) == 0 and name in cfg:
179 # not overridden by config or cl_args
179 # not overridden by config or cl_args
180 setattr(self, name, cfg[name])
180 setattr(self, name, cfg[name])
181 if 'key' in cfg:
181 if 'key' in cfg:
182 self.config.Session.key = str_to_bytes(cfg['key'])
182 self.config.Session.key = str_to_bytes(cfg['key'])
183
183
184 def write_connection_file(self):
184 def write_connection_file(self):
185 """write connection info to JSON file"""
185 """write connection info to JSON file"""
186 if os.path.basename(self.connection_file) == self.connection_file:
186 if os.path.basename(self.connection_file) == self.connection_file:
187 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
187 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
188 else:
188 else:
189 cf = self.connection_file
189 cf = self.connection_file
190 write_connection_file(cf, ip=self.ip, key=self.session.key,
190 write_connection_file(cf, ip=self.ip, key=self.session.key,
191 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
191 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
192 iopub_port=self.iopub_port)
192 iopub_port=self.iopub_port)
193
193
194 def init_connection_file(self):
194 def init_connection_file(self):
195 if not self.connection_file:
195 if not self.connection_file:
196 self.connection_file = "kernel-%s.json"%os.getpid()
196 self.connection_file = "kernel-%s.json"%os.getpid()
197 try:
197 try:
198 self.load_connection_file()
198 self.load_connection_file()
199 except Exception:
199 except Exception:
200 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
200 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
201 self.exit(1)
201 self.exit(1)
202
202
203 def init_sockets(self):
203 def init_sockets(self):
204 # Create a context, a session, and the kernel sockets.
204 # Create a context, a session, and the kernel sockets.
205 self.log.info("Starting the kernel at pid: %i", os.getpid())
205 self.log.info("Starting the kernel at pid: %i", os.getpid())
206 context = zmq.Context.instance()
206 context = zmq.Context.instance()
207 # Uncomment this to try closing the context.
207 # Uncomment this to try closing the context.
208 # atexit.register(context.term)
208 # atexit.register(context.term)
209
209
210 self.shell_socket = context.socket(zmq.ROUTER)
210 self.shell_socket = context.socket(zmq.ROUTER)
211 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
211 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
212 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
212 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
213
213
214 self.iopub_socket = context.socket(zmq.PUB)
214 self.iopub_socket = context.socket(zmq.PUB)
215 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
215 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
216 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
216 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
217
217
218 self.stdin_socket = context.socket(zmq.ROUTER)
218 self.stdin_socket = context.socket(zmq.ROUTER)
219 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
219 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
220 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
220 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
221
221
222 def init_heartbeat(self):
223 """start the heart beating"""
222 # heartbeat doesn't share context, because it mustn't be blocked
224 # heartbeat doesn't share context, because it mustn't be blocked
223 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
225 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
224 hb_ctx = zmq.Context()
226 hb_ctx = zmq.Context()
225 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
227 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
226 self.hb_port = self.heartbeat.port
228 self.hb_port = self.heartbeat.port
227 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
229 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
228 self.heartbeat.start()
230 self.heartbeat.start()
229
231
230 # Helper to make it easier to connect to an existing kernel.
232 # Helper to make it easier to connect to an existing kernel.
231 # set log-level to critical, to make sure it is output
233 # set log-level to critical, to make sure it is output
232 self.log.critical("To connect another client to this kernel, use:")
234 self.log.critical("To connect another client to this kernel, use:")
233
235
236 def log_connection_info(self):
237 """display connection info, and store ports"""
234 basename = os.path.basename(self.connection_file)
238 basename = os.path.basename(self.connection_file)
235 if basename == self.connection_file or \
239 if basename == self.connection_file or \
236 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
240 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
237 # use shortname
241 # use shortname
238 tail = basename
242 tail = basename
239 if self.profile != 'default':
243 if self.profile != 'default':
240 tail += " --profile %s" % self.profile
244 tail += " --profile %s" % self.profile
241 else:
245 else:
242 tail = self.connection_file
246 tail = self.connection_file
243 self.log.critical("--existing %s", tail)
247 self.log.critical("--existing %s", tail)
244
248
245
249
246 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
250 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
247 stdin=self.stdin_port, hb=self.hb_port)
251 stdin=self.stdin_port, hb=self.hb_port)
248
252
249 def init_session(self):
253 def init_session(self):
250 """create our session object"""
254 """create our session object"""
251 default_secure(self.config)
255 default_secure(self.config)
252 self.session = Session(config=self.config, username=u'kernel')
256 self.session = Session(config=self.config, username=u'kernel')
253
257
254 def init_blackhole(self):
258 def init_blackhole(self):
255 """redirects stdout/stderr to devnull if necessary"""
259 """redirects stdout/stderr to devnull if necessary"""
256 if self.no_stdout or self.no_stderr:
260 if self.no_stdout or self.no_stderr:
257 blackhole = open(os.devnull, 'w')
261 blackhole = open(os.devnull, 'w')
258 if self.no_stdout:
262 if self.no_stdout:
259 sys.stdout = sys.__stdout__ = blackhole
263 sys.stdout = sys.__stdout__ = blackhole
260 if self.no_stderr:
264 if self.no_stderr:
261 sys.stderr = sys.__stderr__ = blackhole
265 sys.stderr = sys.__stderr__ = blackhole
262
266
263 def init_io(self):
267 def init_io(self):
264 """Redirect input streams and set a display hook."""
268 """Redirect input streams and set a display hook."""
265 if self.outstream_class:
269 if self.outstream_class:
266 outstream_factory = import_item(str(self.outstream_class))
270 outstream_factory = import_item(str(self.outstream_class))
267 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
271 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
268 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
272 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
269 if self.displayhook_class:
273 if self.displayhook_class:
270 displayhook_factory = import_item(str(self.displayhook_class))
274 displayhook_factory = import_item(str(self.displayhook_class))
271 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
275 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
272
276
273 def init_signal(self):
277 def init_signal(self):
274 signal.signal(signal.SIGINT, signal.SIG_IGN)
278 signal.signal(signal.SIGINT, signal.SIG_IGN)
275
279
276 def init_kernel(self):
280 def init_kernel(self):
277 """Create the Kernel object itself"""
281 """Create the Kernel object itself"""
278 kernel_factory = import_item(str(self.kernel_class))
282 kernel_factory = import_item(str(self.kernel_class))
279 self.kernel = kernel_factory(config=self.config, session=self.session,
283 self.kernel = kernel_factory(config=self.config, session=self.session,
280 shell_socket=self.shell_socket,
284 shell_socket=self.shell_socket,
281 iopub_socket=self.iopub_socket,
285 iopub_socket=self.iopub_socket,
282 stdin_socket=self.stdin_socket,
286 stdin_socket=self.stdin_socket,
283 log=self.log
287 log=self.log
284 )
288 )
285 self.kernel.record_ports(self.ports)
289 self.kernel.record_ports(self.ports)
286
290
287 @catch_config_error
291 @catch_config_error
288 def initialize(self, argv=None):
292 def initialize(self, argv=None):
289 super(KernelApp, self).initialize(argv)
293 super(KernelApp, self).initialize(argv)
290 self.init_blackhole()
294 self.init_blackhole()
291 self.init_connection_file()
295 self.init_connection_file()
292 self.init_session()
296 self.init_session()
293 self.init_poller()
297 self.init_poller()
294 self.init_sockets()
298 self.init_sockets()
295 # writing connection file must be *after* init_sockets
299 self.init_heartbeat()
300 # writing/displaying connection info must be *after* init_sockets/heartbeat
301 self.log_connection_info()
296 self.write_connection_file()
302 self.write_connection_file()
297 self.init_io()
303 self.init_io()
298 self.init_signal()
304 self.init_signal()
299 self.init_kernel()
305 self.init_kernel()
300 # flush stdout/stderr, so that anything written to these streams during
306 # flush stdout/stderr, so that anything written to these streams during
301 # initialization do not get associated with the first execution request
307 # initialization do not get associated with the first execution request
302 sys.stdout.flush()
308 sys.stdout.flush()
303 sys.stderr.flush()
309 sys.stderr.flush()
304
310
305 def start(self):
311 def start(self):
306 if self.poller is not None:
312 if self.poller is not None:
307 self.poller.start()
313 self.poller.start()
308 self.kernel.start()
314 self.kernel.start()
309 try:
315 try:
310 ioloop.IOLoop.instance().start()
316 ioloop.IOLoop.instance().start()
311 except KeyboardInterrupt:
317 except KeyboardInterrupt:
312 pass
318 pass
313
319
General Comments 0
You need to be logged in to leave comments. Login now