##// END OF EJS Templates
split init_sockets into a few steps, so they can be better reused
MinRK -
Show More
@@ -1,313 +1,319 b''
1 1 """An Application for launching a kernel
2 2
3 3 Authors
4 4 -------
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING.txt, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports
19 19 import json
20 20 import os
21 21 import sys
22 22 import signal
23 23
24 24 # System library imports
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27
28 28 # IPython imports
29 29 from IPython.core.ultratb import FormattedTB
30 30 from IPython.core.application import (
31 31 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
32 32 )
33 33 from IPython.utils import io
34 34 from IPython.utils.localinterfaces import LOCALHOST
35 35 from IPython.utils.path import filefind
36 36 from IPython.utils.py3compat import str_to_bytes
37 37 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
38 38 DottedObjectName)
39 39 from IPython.utils.importstring import import_item
40 40 # local imports
41 41 from IPython.zmq.entry_point import write_connection_file
42 42 from IPython.zmq.heartbeat import Heartbeat
43 43 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
44 44 from IPython.zmq.session import (
45 45 Session, session_flags, session_aliases, default_secure,
46 46 )
47 47
48 48
49 49 #-----------------------------------------------------------------------------
50 50 # Flags and Aliases
51 51 #-----------------------------------------------------------------------------
52 52
53 53 kernel_aliases = dict(base_aliases)
54 54 kernel_aliases.update({
55 55 'ip' : 'KernelApp.ip',
56 56 'hb' : 'KernelApp.hb_port',
57 57 'shell' : 'KernelApp.shell_port',
58 58 'iopub' : 'KernelApp.iopub_port',
59 59 'stdin' : 'KernelApp.stdin_port',
60 60 'f' : 'KernelApp.connection_file',
61 61 'parent': 'KernelApp.parent',
62 62 })
63 63 if sys.platform.startswith('win'):
64 64 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
65 65
66 66 kernel_flags = dict(base_flags)
67 67 kernel_flags.update({
68 68 'no-stdout' : (
69 69 {'KernelApp' : {'no_stdout' : True}},
70 70 "redirect stdout to the null device"),
71 71 'no-stderr' : (
72 72 {'KernelApp' : {'no_stderr' : True}},
73 73 "redirect stderr to the null device"),
74 74 })
75 75
76 76 # inherit flags&aliases for Sessions
77 77 kernel_aliases.update(session_aliases)
78 78 kernel_flags.update(session_flags)
79 79
80 80
81 81
82 82 #-----------------------------------------------------------------------------
83 83 # Application class for starting a Kernel
84 84 #-----------------------------------------------------------------------------
85 85
86 86 class KernelApp(BaseIPythonApplication):
87 87 name='ipkernel'
88 88 aliases = Dict(kernel_aliases)
89 89 flags = Dict(kernel_flags)
90 90 classes = [Session]
91 91 # the kernel class, as an importstring
92 92 kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel')
93 93 kernel = Any()
94 94 poller = Any() # don't restrict this even though current pollers are all Threads
95 95 heartbeat = Instance(Heartbeat)
96 96 session = Instance('IPython.zmq.session.Session')
97 97 ports = Dict()
98 98
99 99 # inherit config file name from parent:
100 100 parent_appname = Unicode(config=True)
101 101 def _parent_appname_changed(self, name, old, new):
102 102 if self.config_file_specified:
103 103 # it was manually specified, ignore
104 104 return
105 105 self.config_file_name = new.replace('-','_') + u'_config.py'
106 106 # don't let this count as specifying the config file
107 107 self.config_file_specified = False
108 108
109 109 # connection info:
110 110 ip = Unicode(LOCALHOST, config=True,
111 111 help="Set the IP or interface on which the kernel will listen.")
112 112 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
113 113 shell_port = Integer(0, config=True, help="set the shell (XREP) port [default: random]")
114 114 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
115 115 stdin_port = Integer(0, config=True, help="set the stdin (XREQ) port [default: random]")
116 116 connection_file = Unicode('', config=True,
117 117 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
118 118
119 119 This file will contain the IP, ports, and authentication key needed to connect
120 120 clients to this kernel. By default, this file will be created in the security-dir
121 121 of the current profile, but can be specified by absolute path.
122 122 """)
123 123
124 124 # streams, etc.
125 125 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
126 126 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
127 127 outstream_class = DottedObjectName('IPython.zmq.iostream.OutStream',
128 128 config=True, help="The importstring for the OutStream factory")
129 129 displayhook_class = DottedObjectName('IPython.zmq.displayhook.ZMQDisplayHook',
130 130 config=True, help="The importstring for the DisplayHook factory")
131 131
132 132 # polling
133 133 parent = Integer(0, config=True,
134 134 help="""kill this process if its parent dies. On Windows, the argument
135 135 specifies the HANDLE of the parent process, otherwise it is simply boolean.
136 136 """)
137 137 interrupt = Integer(0, config=True,
138 138 help="""ONLY USED ON WINDOWS
139 139 Interrupt this process when the parent is signalled.
140 140 """)
141 141
142 142 def init_crash_handler(self):
143 143 # Install minimal exception handling
144 144 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
145 145 ostream=sys.__stdout__)
146 146
147 147 def init_poller(self):
148 148 if sys.platform == 'win32':
149 149 if self.interrupt or self.parent:
150 150 self.poller = ParentPollerWindows(self.interrupt, self.parent)
151 151 elif self.parent:
152 152 self.poller = ParentPollerUnix()
153 153
154 154 def _bind_socket(self, s, port):
155 155 iface = 'tcp://%s' % self.ip
156 156 if port <= 0:
157 157 port = s.bind_to_random_port(iface)
158 158 else:
159 159 s.bind(iface + ':%i'%port)
160 160 return port
161 161
162 162 def load_connection_file(self):
163 163 """load ip/port/hmac config from JSON connection file"""
164 164 try:
165 165 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
166 166 except IOError:
167 167 self.log.debug("Connection file not found: %s", self.connection_file)
168 168 return
169 169 self.log.debug(u"Loading connection file %s", fname)
170 170 with open(fname) as f:
171 171 s = f.read()
172 172 cfg = json.loads(s)
173 173 if self.ip == LOCALHOST and 'ip' in cfg:
174 174 # not overridden by config or cl_args
175 175 self.ip = cfg['ip']
176 176 for channel in ('hb', 'shell', 'iopub', 'stdin'):
177 177 name = channel + '_port'
178 178 if getattr(self, name) == 0 and name in cfg:
179 179 # not overridden by config or cl_args
180 180 setattr(self, name, cfg[name])
181 181 if 'key' in cfg:
182 182 self.config.Session.key = str_to_bytes(cfg['key'])
183 183
184 184 def write_connection_file(self):
185 185 """write connection info to JSON file"""
186 186 if os.path.basename(self.connection_file) == self.connection_file:
187 187 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
188 188 else:
189 189 cf = self.connection_file
190 190 write_connection_file(cf, ip=self.ip, key=self.session.key,
191 191 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
192 192 iopub_port=self.iopub_port)
193 193
194 194 def init_connection_file(self):
195 195 if not self.connection_file:
196 196 self.connection_file = "kernel-%s.json"%os.getpid()
197 197 try:
198 198 self.load_connection_file()
199 199 except Exception:
200 200 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
201 201 self.exit(1)
202 202
203 203 def init_sockets(self):
204 204 # Create a context, a session, and the kernel sockets.
205 205 self.log.info("Starting the kernel at pid: %i", os.getpid())
206 206 context = zmq.Context.instance()
207 207 # Uncomment this to try closing the context.
208 208 # atexit.register(context.term)
209 209
210 210 self.shell_socket = context.socket(zmq.ROUTER)
211 211 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
212 212 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
213 213
214 214 self.iopub_socket = context.socket(zmq.PUB)
215 215 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
216 216 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
217 217
218 218 self.stdin_socket = context.socket(zmq.ROUTER)
219 219 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
220 220 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
221
221
222 def init_heartbeat(self):
223 """start the heart beating"""
222 224 # heartbeat doesn't share context, because it mustn't be blocked
223 225 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
224 226 hb_ctx = zmq.Context()
225 227 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
226 228 self.hb_port = self.heartbeat.port
227 229 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
228 230 self.heartbeat.start()
229 231
230 232 # Helper to make it easier to connect to an existing kernel.
231 233 # set log-level to critical, to make sure it is output
232 234 self.log.critical("To connect another client to this kernel, use:")
233
235
236 def log_connection_info(self):
237 """display connection info, and store ports"""
234 238 basename = os.path.basename(self.connection_file)
235 239 if basename == self.connection_file or \
236 240 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
237 241 # use shortname
238 242 tail = basename
239 243 if self.profile != 'default':
240 244 tail += " --profile %s" % self.profile
241 245 else:
242 246 tail = self.connection_file
243 247 self.log.critical("--existing %s", tail)
244 248
245 249
246 250 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
247 251 stdin=self.stdin_port, hb=self.hb_port)
248 252
249 253 def init_session(self):
250 254 """create our session object"""
251 255 default_secure(self.config)
252 256 self.session = Session(config=self.config, username=u'kernel')
253 257
254 258 def init_blackhole(self):
255 259 """redirects stdout/stderr to devnull if necessary"""
256 260 if self.no_stdout or self.no_stderr:
257 261 blackhole = open(os.devnull, 'w')
258 262 if self.no_stdout:
259 263 sys.stdout = sys.__stdout__ = blackhole
260 264 if self.no_stderr:
261 265 sys.stderr = sys.__stderr__ = blackhole
262 266
263 267 def init_io(self):
264 268 """Redirect input streams and set a display hook."""
265 269 if self.outstream_class:
266 270 outstream_factory = import_item(str(self.outstream_class))
267 271 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
268 272 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
269 273 if self.displayhook_class:
270 274 displayhook_factory = import_item(str(self.displayhook_class))
271 275 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
272 276
273 277 def init_signal(self):
274 278 signal.signal(signal.SIGINT, signal.SIG_IGN)
275 279
276 280 def init_kernel(self):
277 281 """Create the Kernel object itself"""
278 282 kernel_factory = import_item(str(self.kernel_class))
279 283 self.kernel = kernel_factory(config=self.config, session=self.session,
280 284 shell_socket=self.shell_socket,
281 285 iopub_socket=self.iopub_socket,
282 286 stdin_socket=self.stdin_socket,
283 287 log=self.log
284 288 )
285 289 self.kernel.record_ports(self.ports)
286 290
287 291 @catch_config_error
288 292 def initialize(self, argv=None):
289 293 super(KernelApp, self).initialize(argv)
290 294 self.init_blackhole()
291 295 self.init_connection_file()
292 296 self.init_session()
293 297 self.init_poller()
294 298 self.init_sockets()
295 # writing connection file must be *after* init_sockets
299 self.init_heartbeat()
300 # writing/displaying connection info must be *after* init_sockets/heartbeat
301 self.log_connection_info()
296 302 self.write_connection_file()
297 303 self.init_io()
298 304 self.init_signal()
299 305 self.init_kernel()
300 306 # flush stdout/stderr, so that anything written to these streams during
301 307 # initialization do not get associated with the first execution request
302 308 sys.stdout.flush()
303 309 sys.stderr.flush()
304 310
305 311 def start(self):
306 312 if self.poller is not None:
307 313 self.poller.start()
308 314 self.kernel.start()
309 315 try:
310 316 ioloop.IOLoop.instance().start()
311 317 except KeyboardInterrupt:
312 318 pass
313 319
General Comments 0
You need to be logged in to leave comments. Login now