##// END OF EJS Templates
Merge pull request #1868 from minrk/ipc...
Bussonnier Matthias -
r8501:af13023a merge
parent child Browse files
Show More
@@ -1,356 +1,378 b''
1 1 """ A minimal application base mixin for all ZMQ based IPython frontends.
2 2
3 3 This is not a complete console app, as subprocess will not be able to receive
4 4 input, there is no real readline support, among other limitations. This is a
5 5 refactoring of what used to be the IPython/frontend/qt/console/qtconsoleapp.py
6 6
7 7 Authors:
8 8
9 9 * Evan Patterson
10 10 * Min RK
11 11 * Erik Tollerud
12 12 * Fernando Perez
13 13 * Bussonnier Matthias
14 14 * Thomas Kluyver
15 15 * Paul Ivanov
16 16
17 17 """
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 # stdlib imports
24 24 import atexit
25 25 import json
26 26 import os
27 import shutil
27 28 import signal
28 29 import sys
29 30 import uuid
30 31
31 32
32 33 # Local imports
33 34 from IPython.config.application import boolean_flag
34 35 from IPython.config.configurable import Configurable
35 36 from IPython.core.profiledir import ProfileDir
36 37 from IPython.lib.kernel import tunnel_to_kernel, find_connection_file, swallow_argv
37 38 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
38 39 from IPython.utils.path import filefind
39 40 from IPython.utils.py3compat import str_to_bytes
40 41 from IPython.utils.traitlets import (
41 Dict, List, Unicode, CUnicode, Int, CBool, Any
42 Dict, List, Unicode, CUnicode, Int, CBool, Any, CaselessStrEnum
42 43 )
43 44 from IPython.zmq.ipkernel import (
44 45 flags as ipkernel_flags,
45 46 aliases as ipkernel_aliases,
46 47 IPKernelApp
47 48 )
48 49 from IPython.zmq.session import Session, default_secure
49 50 from IPython.zmq.zmqshell import ZMQInteractiveShell
50 51
51 52 #-----------------------------------------------------------------------------
52 53 # Network Constants
53 54 #-----------------------------------------------------------------------------
54 55
55 56 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
56 57
57 58 #-----------------------------------------------------------------------------
58 59 # Globals
59 60 #-----------------------------------------------------------------------------
60 61
61 62
62 63 #-----------------------------------------------------------------------------
63 64 # Aliases and Flags
64 65 #-----------------------------------------------------------------------------
65 66
66 67 flags = dict(ipkernel_flags)
67 68
68 69 # the flags that are specific to the frontend
69 70 # these must be scrubbed before being passed to the kernel,
70 71 # or it will raise an error on unrecognized flags
71 72 app_flags = {
72 73 'existing' : ({'IPythonConsoleApp' : {'existing' : 'kernel*.json'}},
73 74 "Connect to an existing kernel. If no argument specified, guess most recent"),
74 75 }
75 76 app_flags.update(boolean_flag(
76 77 'confirm-exit', 'IPythonConsoleApp.confirm_exit',
77 78 """Set to display confirmation dialog on exit. You can always use 'exit' or 'quit',
78 79 to force a direct exit without any confirmation.
79 80 """,
80 81 """Don't prompt the user when exiting. This will terminate the kernel
81 82 if it is owned by the frontend, and leave it alive if it is external.
82 83 """
83 84 ))
84 85 flags.update(app_flags)
85 86
86 87 aliases = dict(ipkernel_aliases)
87 88
88 89 # also scrub aliases from the frontend
89 90 app_aliases = dict(
90 91 hb = 'IPythonConsoleApp.hb_port',
91 92 shell = 'IPythonConsoleApp.shell_port',
92 93 iopub = 'IPythonConsoleApp.iopub_port',
93 94 stdin = 'IPythonConsoleApp.stdin_port',
94 95 ip = 'IPythonConsoleApp.ip',
95 96 existing = 'IPythonConsoleApp.existing',
96 97 f = 'IPythonConsoleApp.connection_file',
97 98
98 99
99 100 ssh = 'IPythonConsoleApp.sshserver',
100 101 )
101 102 aliases.update(app_aliases)
102 103
103 104 #-----------------------------------------------------------------------------
104 105 # Classes
105 106 #-----------------------------------------------------------------------------
106 107
107 108 #-----------------------------------------------------------------------------
108 109 # IPythonConsole
109 110 #-----------------------------------------------------------------------------
110 111
111 112 classes = [IPKernelApp, ZMQInteractiveShell, ProfileDir, Session]
112 113
113 114 try:
114 115 from IPython.zmq.pylab.backend_inline import InlineBackend
115 116 except ImportError:
116 117 pass
117 118 else:
118 119 classes.append(InlineBackend)
119 120
120 121 class IPythonConsoleApp(Configurable):
121 122 name = 'ipython-console-mixin'
122 123 default_config_file_name='ipython_config.py'
123 124
124 125 description = """
125 126 The IPython Mixin Console.
126 127
127 128 This class contains the common portions of console client (QtConsole,
128 129 ZMQ-based terminal console, etc). It is not a full console, in that
129 130 launched terminal subprocesses will not be able to accept input.
130 131
131 132 The Console using this mixing supports various extra features beyond
132 133 the single-process Terminal IPython shell, such as connecting to
133 134 existing kernel, via:
134 135
135 136 ipython <appname> --existing
136 137
137 138 as well as tunnel via SSH
138 139
139 140 """
140 141
141 142 classes = classes
142 143 flags = Dict(flags)
143 144 aliases = Dict(aliases)
144 145 kernel_manager_class = BlockingKernelManager
145 146
146 147 kernel_argv = List(Unicode)
147 148 # frontend flags&aliases to be stripped when building kernel_argv
148 149 frontend_flags = Any(app_flags)
149 150 frontend_aliases = Any(app_aliases)
150 151
151 152 # create requested profiles by default, if they don't exist:
152 153 auto_create = CBool(True)
153 154 # connection info:
154 ip = Unicode(LOCALHOST, config=True,
155
156 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
157
158 ip = Unicode(config=True,
155 159 help="""Set the kernel\'s IP address [default localhost].
156 160 If the IP address is something other than localhost, then
157 161 Consoles on other machines will be able to connect
158 162 to the Kernel, so be careful!"""
159 163 )
164 def _ip_default(self):
165 if self.transport == 'tcp':
166 return LOCALHOST
167 else:
168 # this can fire early if ip is given,
169 # in which case our return value is meaningless
170 if not hasattr(self, 'profile_dir'):
171 return ''
172 ipcdir = os.path.join(self.profile_dir.security_dir, 'kernel-%s' % os.getpid())
173 os.makedirs(ipcdir)
174 atexit.register(lambda : shutil.rmtree(ipcdir))
175 return os.path.join(ipcdir, 'ipc')
160 176
161 177 sshserver = Unicode('', config=True,
162 178 help="""The SSH server to use to connect to the kernel.""")
163 179 sshkey = Unicode('', config=True,
164 180 help="""Path to the ssh key to use for logging in to the ssh server.""")
165 181
166 182 hb_port = Int(0, config=True,
167 183 help="set the heartbeat port [default: random]")
168 184 shell_port = Int(0, config=True,
169 185 help="set the shell (ROUTER) port [default: random]")
170 186 iopub_port = Int(0, config=True,
171 187 help="set the iopub (PUB) port [default: random]")
172 188 stdin_port = Int(0, config=True,
173 189 help="set the stdin (DEALER) port [default: random]")
174 190 connection_file = Unicode('', config=True,
175 191 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
176 192
177 193 This file will contain the IP, ports, and authentication key needed to connect
178 194 clients to this kernel. By default, this file will be created in the security-dir
179 195 of the current profile, but can be specified by absolute path.
180 196 """)
181 197 def _connection_file_default(self):
182 198 return 'kernel-%i.json' % os.getpid()
183 199
184 200 existing = CUnicode('', config=True,
185 201 help="""Connect to an already running kernel""")
186 202
187 203 confirm_exit = CBool(True, config=True,
188 204 help="""
189 205 Set to display confirmation dialog on exit. You can always use 'exit' or 'quit',
190 206 to force a direct exit without any confirmation.""",
191 207 )
192 208
193 209
194 210 def build_kernel_argv(self, argv=None):
195 211 """build argv to be passed to kernel subprocess"""
196 212 if argv is None:
197 213 argv = sys.argv[1:]
198 214 self.kernel_argv = swallow_argv(argv, self.frontend_aliases, self.frontend_flags)
199 215 # kernel should inherit default config file from frontend
200 216 self.kernel_argv.append("--KernelApp.parent_appname='%s'"%self.name)
201 217
202 218 def init_connection_file(self):
203 219 """find the connection file, and load the info if found.
204 220
205 221 The current working directory and the current profile's security
206 222 directory will be searched for the file if it is not given by
207 223 absolute path.
208 224
209 225 When attempting to connect to an existing kernel and the `--existing`
210 226 argument does not match an existing file, it will be interpreted as a
211 227 fileglob, and the matching file in the current profile's security dir
212 228 with the latest access time will be used.
213 229
214 230 After this method is called, self.connection_file contains the *full path*
215 231 to the connection file, never just its name.
216 232 """
217 233 if self.existing:
218 234 try:
219 235 cf = find_connection_file(self.existing)
220 236 except Exception:
221 237 self.log.critical("Could not find existing kernel connection file %s", self.existing)
222 238 self.exit(1)
223 239 self.log.info("Connecting to existing kernel: %s" % cf)
224 240 self.connection_file = cf
225 241 else:
226 242 # not existing, check if we are going to write the file
227 243 # and ensure that self.connection_file is a full path, not just the shortname
228 244 try:
229 245 cf = find_connection_file(self.connection_file)
230 246 except Exception:
231 247 # file might not exist
232 248 if self.connection_file == os.path.basename(self.connection_file):
233 249 # just shortname, put it in security dir
234 250 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
235 251 else:
236 252 cf = self.connection_file
237 253 self.connection_file = cf
238 254
239 255 # should load_connection_file only be used for existing?
240 256 # as it is now, this allows reusing ports if an existing
241 257 # file is requested
242 258 try:
243 259 self.load_connection_file()
244 260 except Exception:
245 261 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
246 262 self.exit(1)
247 263
248 264 def load_connection_file(self):
249 265 """load ip/port/hmac config from JSON connection file"""
250 266 # this is identical to KernelApp.load_connection_file
251 267 # perhaps it can be centralized somewhere?
252 268 try:
253 269 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
254 270 except IOError:
255 271 self.log.debug("Connection File not found: %s", self.connection_file)
256 272 return
257 273 self.log.debug(u"Loading connection file %s", fname)
258 274 with open(fname) as f:
259 s = f.read()
260 cfg = json.loads(s)
261 if self.ip == LOCALHOST and 'ip' in cfg:
262 # not overridden by config or cl_args
275 cfg = json.load(f)
276
277 self.transport = cfg.get('transport', 'tcp')
278 if 'ip' in cfg:
263 279 self.ip = cfg['ip']
264 280 for channel in ('hb', 'shell', 'iopub', 'stdin'):
265 281 name = channel + '_port'
266 282 if getattr(self, name) == 0 and name in cfg:
267 283 # not overridden by config or cl_args
268 284 setattr(self, name, cfg[name])
269 285 if 'key' in cfg:
270 286 self.config.Session.key = str_to_bytes(cfg['key'])
287
271 288
272 289 def init_ssh(self):
273 290 """set up ssh tunnels, if needed."""
274 291 if not self.sshserver and not self.sshkey:
275 292 return
276 293
294 if self.transport != 'tcp':
295 self.log.error("Can only use ssh tunnels with TCP sockets, not %s", self.transport)
296 return
297
277 298 if self.sshkey and not self.sshserver:
278 299 # specifying just the key implies that we are connecting directly
279 300 self.sshserver = self.ip
280 301 self.ip = LOCALHOST
281 302
282 303 # build connection dict for tunnels:
283 304 info = dict(ip=self.ip,
284 305 shell_port=self.shell_port,
285 306 iopub_port=self.iopub_port,
286 307 stdin_port=self.stdin_port,
287 308 hb_port=self.hb_port
288 309 )
289 310
290 311 self.log.info("Forwarding connections to %s via %s"%(self.ip, self.sshserver))
291 312
292 313 # tunnels return a new set of ports, which will be on localhost:
293 314 self.ip = LOCALHOST
294 315 try:
295 316 newports = tunnel_to_kernel(info, self.sshserver, self.sshkey)
296 317 except:
297 318 # even catch KeyboardInterrupt
298 319 self.log.error("Could not setup tunnels", exc_info=True)
299 320 self.exit(1)
300 321
301 322 self.shell_port, self.iopub_port, self.stdin_port, self.hb_port = newports
302 323
303 324 cf = self.connection_file
304 325 base,ext = os.path.splitext(cf)
305 326 base = os.path.basename(base)
306 327 self.connection_file = os.path.basename(base)+'-ssh'+ext
307 328 self.log.critical("To connect another client via this tunnel, use:")
308 329 self.log.critical("--existing %s" % self.connection_file)
309 330
310 331 def _new_connection_file(self):
311 332 cf = ''
312 333 while not cf:
313 334 # we don't need a 128b id to distinguish kernels, use more readable
314 335 # 48b node segment (12 hex chars). Users running more than 32k simultaneous
315 336 # kernels can subclass.
316 337 ident = str(uuid.uuid4()).split('-')[-1]
317 338 cf = os.path.join(self.profile_dir.security_dir, 'kernel-%s.json' % ident)
318 339 # only keep if it's actually new. Protect against unlikely collision
319 340 # in 48b random search space
320 341 cf = cf if not os.path.exists(cf) else ''
321 342 return cf
322 343
323 344 def init_kernel_manager(self):
324 345 # Don't let Qt or ZMQ swallow KeyboardInterupts.
325 346 signal.signal(signal.SIGINT, signal.SIG_DFL)
326 347
327 348 # Create a KernelManager and start a kernel.
328 349 self.kernel_manager = self.kernel_manager_class(
350 transport=self.transport,
329 351 ip=self.ip,
330 352 shell_port=self.shell_port,
331 353 iopub_port=self.iopub_port,
332 354 stdin_port=self.stdin_port,
333 355 hb_port=self.hb_port,
334 356 connection_file=self.connection_file,
335 357 config=self.config,
336 358 )
337 359 # start the kernel
338 360 if not self.existing:
339 361 self.kernel_manager.start_kernel(extra_arguments=self.kernel_argv)
340 362 elif self.sshserver:
341 363 # ssh, write new connection file
342 364 self.kernel_manager.write_connection_file()
343 365 atexit.register(self.kernel_manager.cleanup_connection_file)
344 366 self.kernel_manager.start_channels()
345 367
346 368
347 369 def initialize(self, argv=None):
348 370 """
349 371 Classes which mix this class in should call:
350 372 IPythonConsoleApp.initialize(self,argv)
351 373 """
352 374 self.init_connection_file()
353 375 default_secure(self.config)
354 376 self.init_ssh()
355 377 self.init_kernel_manager()
356 378
@@ -1,212 +1,222 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3 """
4 4
5 5 # Standard library imports.
6 6 import atexit
7 7 import json
8 8 import os
9 9 import socket
10 10 from subprocess import Popen, PIPE
11 11 import sys
12 12 import tempfile
13 13
14 14 # System library imports
15 15
16 16 # IPython imports
17 17 from IPython.utils.localinterfaces import LOCALHOST
18 18 from IPython.utils.py3compat import bytes_to_str
19 19
20 20 # Local imports.
21 21 from parentpoller import ParentPollerWindows
22 22
23 23 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
24 ip=LOCALHOST, key=b''):
24 ip=LOCALHOST, key=b'', transport='tcp'):
25 25 """Generates a JSON config file, including the selection of random ports.
26 26
27 27 Parameters
28 28 ----------
29 29
30 30 fname : unicode
31 31 The path to the file to write
32 32
33 33 shell_port : int, optional
34 34 The port to use for ROUTER channel.
35 35
36 36 iopub_port : int, optional
37 37 The port to use for the SUB channel.
38 38
39 39 stdin_port : int, optional
40 40 The port to use for the REQ (raw input) channel.
41 41
42 42 hb_port : int, optional
43 43 The port to use for the hearbeat REP channel.
44 44
45 45 ip : str, optional
46 46 The ip address the kernel will bind to.
47 47
48 48 key : str, optional
49 49 The Session key used for HMAC authentication.
50 50
51 51 """
52 52 # default to temporary connector file
53 53 if not fname:
54 54 fname = tempfile.mktemp('.json')
55 55
56 56 # Find open ports as necessary.
57
57 58 ports = []
58 59 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
59 60 int(stdin_port <= 0) + int(hb_port <= 0)
60 for i in xrange(ports_needed):
61 sock = socket.socket()
62 sock.bind(('', 0))
63 ports.append(sock)
64 for i, sock in enumerate(ports):
65 port = sock.getsockname()[1]
66 sock.close()
67 ports[i] = port
61 if transport == 'tcp':
62 for i in range(ports_needed):
63 sock = socket.socket()
64 sock.bind(('', 0))
65 ports.append(sock)
66 for i, sock in enumerate(ports):
67 port = sock.getsockname()[1]
68 sock.close()
69 ports[i] = port
70 else:
71 N = 1
72 for i in range(ports_needed):
73 while os.path.exists("%s-%s" % (ip, str(N))):
74 N += 1
75 ports.append(N)
76 N += 1
68 77 if shell_port <= 0:
69 78 shell_port = ports.pop(0)
70 79 if iopub_port <= 0:
71 80 iopub_port = ports.pop(0)
72 81 if stdin_port <= 0:
73 82 stdin_port = ports.pop(0)
74 83 if hb_port <= 0:
75 84 hb_port = ports.pop(0)
76 85
77 86 cfg = dict( shell_port=shell_port,
78 87 iopub_port=iopub_port,
79 88 stdin_port=stdin_port,
80 89 hb_port=hb_port,
81 90 )
82 91 cfg['ip'] = ip
83 92 cfg['key'] = bytes_to_str(key)
93 cfg['transport'] = transport
84 94
85 95 with open(fname, 'w') as f:
86 96 f.write(json.dumps(cfg, indent=2))
87 97
88 98 return fname, cfg
89 99
90 100
91 101 def base_launch_kernel(code, fname, stdin=None, stdout=None, stderr=None,
92 102 executable=None, independent=False, extra_arguments=[],
93 103 cwd=None):
94 104 """ Launches a localhost kernel, binding to the specified ports.
95 105
96 106 Parameters
97 107 ----------
98 108 code : str,
99 109 A string of Python code that imports and executes a kernel entry point.
100 110
101 111 stdin, stdout, stderr : optional (default None)
102 112 Standards streams, as defined in subprocess.Popen.
103 113
104 114 fname : unicode, optional
105 115 The JSON connector file, containing ip/port/hmac key information.
106 116
107 117 key : str, optional
108 118 The Session key used for HMAC authentication.
109 119
110 120 executable : str, optional (default sys.executable)
111 121 The Python executable to use for the kernel process.
112 122
113 123 independent : bool, optional (default False)
114 124 If set, the kernel process is guaranteed to survive if this process
115 125 dies. If not set, an effort is made to ensure that the kernel is killed
116 126 when this process dies. Note that in this case it is still good practice
117 127 to kill kernels manually before exiting.
118 128
119 129 extra_arguments : list, optional
120 130 A list of extra arguments to pass when executing the launch code.
121 131
122 132 cwd : path, optional
123 133 The working dir of the kernel process (default: cwd of this process).
124 134
125 135 Returns
126 136 -------
127 137 A tuple of form:
128 138 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
129 139 where kernel_process is a Popen object and the ports are integers.
130 140 """
131 141
132 142 # Build the kernel launch command.
133 143 if executable is None:
134 144 executable = sys.executable
135 145 arguments = [ executable, '-c', code, '-f', fname ]
136 146 arguments.extend(extra_arguments)
137 147
138 148 # Popen will fail (sometimes with a deadlock) if stdin, stdout, and stderr
139 149 # are invalid. Unfortunately, there is in general no way to detect whether
140 150 # they are valid. The following two blocks redirect them to (temporary)
141 151 # pipes in certain important cases.
142 152
143 153 # If this process has been backgrounded, our stdin is invalid. Since there
144 154 # is no compelling reason for the kernel to inherit our stdin anyway, we'll
145 155 # place this one safe and always redirect.
146 156 redirect_in = True
147 157 _stdin = PIPE if stdin is None else stdin
148 158
149 159 # If this process in running on pythonw, we know that stdin, stdout, and
150 160 # stderr are all invalid.
151 161 redirect_out = sys.executable.endswith('pythonw.exe')
152 162 if redirect_out:
153 163 _stdout = PIPE if stdout is None else stdout
154 164 _stderr = PIPE if stderr is None else stderr
155 165 else:
156 166 _stdout, _stderr = stdout, stderr
157 167
158 168 # Spawn a kernel.
159 169 if sys.platform == 'win32':
160 170 # Create a Win32 event for interrupting the kernel.
161 171 interrupt_event = ParentPollerWindows.create_interrupt_event()
162 172 arguments += [ '--interrupt=%i'%interrupt_event ]
163 173
164 174 # If the kernel is running on pythonw and stdout/stderr are not been
165 175 # re-directed, it will crash when more than 4KB of data is written to
166 176 # stdout or stderr. This is a bug that has been with Python for a very
167 177 # long time; see http://bugs.python.org/issue706263.
168 178 # A cleaner solution to this problem would be to pass os.devnull to
169 179 # Popen directly. Unfortunately, that does not work.
170 180 if executable.endswith('pythonw.exe'):
171 181 if stdout is None:
172 182 arguments.append('--no-stdout')
173 183 if stderr is None:
174 184 arguments.append('--no-stderr')
175 185
176 186 # Launch the kernel process.
177 187 if independent:
178 188 proc = Popen(arguments,
179 189 creationflags=512, # CREATE_NEW_PROCESS_GROUP
180 190 stdin=_stdin, stdout=_stdout, stderr=_stderr)
181 191 else:
182 192 from _subprocess import DuplicateHandle, GetCurrentProcess, \
183 193 DUPLICATE_SAME_ACCESS
184 194 pid = GetCurrentProcess()
185 195 handle = DuplicateHandle(pid, pid, pid, 0,
186 196 True, # Inheritable by new processes.
187 197 DUPLICATE_SAME_ACCESS)
188 198 proc = Popen(arguments + ['--parent=%i'%int(handle)],
189 199 stdin=_stdin, stdout=_stdout, stderr=_stderr)
190 200
191 201 # Attach the interrupt event to the Popen objet so it can be used later.
192 202 proc.win32_interrupt_event = interrupt_event
193 203
194 204 else:
195 205 if independent:
196 206 proc = Popen(arguments, preexec_fn=lambda: os.setsid(),
197 207 stdin=_stdin, stdout=_stdout, stderr=_stderr, cwd=cwd)
198 208 else:
199 209 proc = Popen(arguments + ['--parent=1'],
200 210 stdin=_stdin, stdout=_stdout, stderr=_stderr, cwd=cwd)
201 211
202 212 # Clean up pipes created to work around Popen bug.
203 213 if redirect_in:
204 214 if stdin is None:
205 215 proc.stdin.close()
206 216 if redirect_out:
207 217 if stdout is None:
208 218 proc.stdout.close()
209 219 if stderr is None:
210 220 proc.stderr.close()
211 221
212 222 return proc
@@ -1,48 +1,56 b''
1 1 """The client and server for a basic ping-pong style heartbeat.
2 2 """
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2008-2011 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 import os
15 16 import socket
16 17 import sys
17 18 from threading import Thread
18 19
19 20 import zmq
20 21
21 22 from IPython.utils.localinterfaces import LOCALHOST
22 23
23 24 #-----------------------------------------------------------------------------
24 25 # Code
25 26 #-----------------------------------------------------------------------------
26 27
27 28
28 29 class Heartbeat(Thread):
29 30 "A simple ping-pong style heartbeat that runs in a thread."
30 31
31 def __init__(self, context, addr=(LOCALHOST, 0)):
32 def __init__(self, context, addr=('tcp', LOCALHOST, 0)):
32 33 Thread.__init__(self)
33 34 self.context = context
34 self.ip, self.port = addr
35 self.transport, self.ip, self.port = addr
35 36 if self.port == 0:
36 s = socket.socket()
37 # '*' means all interfaces to 0MQ, which is '' to socket.socket
38 s.bind(('' if self.ip == '*' else self.ip, 0))
39 self.port = s.getsockname()[1]
40 s.close()
37 if addr[0] == 'tcp':
38 s = socket.socket()
39 # '*' means all interfaces to 0MQ, which is '' to socket.socket
40 s.bind(('' if self.ip == '*' else self.ip, 0))
41 self.port = s.getsockname()[1]
42 s.close()
43 elif addr[0] == 'ipc':
44 while os.path.exists(self.ip + '-' + self.port):
45 self.port = self.port + 1
46 else:
47 raise ValueError("Unrecognized zmq transport: %s" % addr[0])
41 48 self.addr = (self.ip, self.port)
42 49 self.daemon = True
43 50
44 51 def run(self):
45 52 self.socket = self.context.socket(zmq.REP)
46 self.socket.bind('tcp://%s:%i' % self.addr)
53 c = ':' if self.transport == 'tcp' else '-'
54 self.socket.bind('%s://%s' % (self.transport, self.ip) + c + str(self.port))
47 55 zmq.device(zmq.FORWARDER, self.socket, self.socket)
48 56
@@ -1,333 +1,351 b''
1 1 """An Application for launching a kernel
2 2
3 3 Authors
4 4 -------
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING.txt, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports
19 19 import atexit
20 20 import json
21 21 import os
22 22 import sys
23 23 import signal
24 24
25 25 # System library imports
26 26 import zmq
27 27 from zmq.eventloop import ioloop
28 28
29 29 # IPython imports
30 30 from IPython.core.ultratb import FormattedTB
31 31 from IPython.core.application import (
32 32 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
33 33 )
34 34 from IPython.utils import io
35 35 from IPython.utils.localinterfaces import LOCALHOST
36 36 from IPython.utils.path import filefind
37 37 from IPython.utils.py3compat import str_to_bytes
38 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
39 DottedObjectName)
38 from IPython.utils.traitlets import (
39 Any, Instance, Dict, Unicode, Integer, Bool, CaselessStrEnum,
40 DottedObjectName,
41 )
40 42 from IPython.utils.importstring import import_item
41 43 # local imports
42 44 from IPython.zmq.entry_point import write_connection_file
43 45 from IPython.zmq.heartbeat import Heartbeat
44 46 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
45 47 from IPython.zmq.session import (
46 48 Session, session_flags, session_aliases, default_secure,
47 49 )
48 50
49 51
50 52 #-----------------------------------------------------------------------------
51 53 # Flags and Aliases
52 54 #-----------------------------------------------------------------------------
53 55
54 56 kernel_aliases = dict(base_aliases)
55 57 kernel_aliases.update({
56 58 'ip' : 'KernelApp.ip',
57 59 'hb' : 'KernelApp.hb_port',
58 60 'shell' : 'KernelApp.shell_port',
59 61 'iopub' : 'KernelApp.iopub_port',
60 62 'stdin' : 'KernelApp.stdin_port',
61 63 'f' : 'KernelApp.connection_file',
62 64 'parent': 'KernelApp.parent',
63 65 })
64 66 if sys.platform.startswith('win'):
65 67 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
66 68
67 69 kernel_flags = dict(base_flags)
68 70 kernel_flags.update({
69 71 'no-stdout' : (
70 72 {'KernelApp' : {'no_stdout' : True}},
71 73 "redirect stdout to the null device"),
72 74 'no-stderr' : (
73 75 {'KernelApp' : {'no_stderr' : True}},
74 76 "redirect stderr to the null device"),
75 77 })
76 78
77 79 # inherit flags&aliases for Sessions
78 80 kernel_aliases.update(session_aliases)
79 81 kernel_flags.update(session_flags)
80 82
81 83
82 84
83 85 #-----------------------------------------------------------------------------
84 86 # Application class for starting a Kernel
85 87 #-----------------------------------------------------------------------------
86 88
87 89 class KernelApp(BaseIPythonApplication):
88 90 name='ipkernel'
89 91 aliases = Dict(kernel_aliases)
90 92 flags = Dict(kernel_flags)
91 93 classes = [Session]
92 94 # the kernel class, as an importstring
93 95 kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel')
94 96 kernel = Any()
95 97 poller = Any() # don't restrict this even though current pollers are all Threads
96 98 heartbeat = Instance(Heartbeat)
97 99 session = Instance('IPython.zmq.session.Session')
98 100 ports = Dict()
99 101 _full_connection_file = Unicode()
100 102
101 103 # inherit config file name from parent:
102 104 parent_appname = Unicode(config=True)
103 105 def _parent_appname_changed(self, name, old, new):
104 106 if self.config_file_specified:
105 107 # it was manually specified, ignore
106 108 return
107 109 self.config_file_name = new.replace('-','_') + u'_config.py'
108 110 # don't let this count as specifying the config file
109 111 self.config_file_specified = False
110 112
111 113 # connection info:
114 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
112 115 ip = Unicode(LOCALHOST, config=True,
113 116 help="Set the IP or interface on which the kernel will listen.")
114 117 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
115 118 shell_port = Integer(0, config=True, help="set the shell (ROUTER) port [default: random]")
116 119 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
117 120 stdin_port = Integer(0, config=True, help="set the stdin (DEALER) port [default: random]")
118 121 connection_file = Unicode('', config=True,
119 122 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
120 123
121 124 This file will contain the IP, ports, and authentication key needed to connect
122 125 clients to this kernel. By default, this file will be created in the security-dir
123 126 of the current profile, but can be specified by absolute path.
124 127 """)
125 128
126 129 # streams, etc.
127 130 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
128 131 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
129 132 outstream_class = DottedObjectName('IPython.zmq.iostream.OutStream',
130 133 config=True, help="The importstring for the OutStream factory")
131 134 displayhook_class = DottedObjectName('IPython.zmq.displayhook.ZMQDisplayHook',
132 135 config=True, help="The importstring for the DisplayHook factory")
133 136
134 137 # polling
135 138 parent = Integer(0, config=True,
136 139 help="""kill this process if its parent dies. On Windows, the argument
137 140 specifies the HANDLE of the parent process, otherwise it is simply boolean.
138 141 """)
139 142 interrupt = Integer(0, config=True,
140 143 help="""ONLY USED ON WINDOWS
141 144 Interrupt this process when the parent is signalled.
142 145 """)
143 146
144 147 def init_crash_handler(self):
145 148 # Install minimal exception handling
146 149 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
147 150 ostream=sys.__stdout__)
148 151
149 152 def init_poller(self):
150 153 if sys.platform == 'win32':
151 154 if self.interrupt or self.parent:
152 155 self.poller = ParentPollerWindows(self.interrupt, self.parent)
153 156 elif self.parent:
154 157 self.poller = ParentPollerUnix()
155 158
156 159 def _bind_socket(self, s, port):
157 iface = 'tcp://%s' % self.ip
158 if port <= 0:
160 iface = '%s://%s' % (self.transport, self.ip)
161 if port <= 0 and self.transport == 'tcp':
159 162 port = s.bind_to_random_port(iface)
160 163 else:
161 s.bind(iface + ':%i'%port)
164 c = ':' if self.transport == 'tcp' else '-'
165 s.bind(iface + c + str(port))
162 166 return port
163 167
164 168 def load_connection_file(self):
165 169 """load ip/port/hmac config from JSON connection file"""
166 170 try:
167 171 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
168 172 except IOError:
169 173 self.log.debug("Connection file not found: %s", self.connection_file)
170 174 # This means I own it, so I will clean it up:
171 175 atexit.register(self.cleanup_connection_file)
172 176 return
173 177 self.log.debug(u"Loading connection file %s", fname)
174 178 with open(fname) as f:
175 179 s = f.read()
176 180 cfg = json.loads(s)
181 self.transport = cfg.get('transport', self.transport)
177 182 if self.ip == LOCALHOST and 'ip' in cfg:
178 183 # not overridden by config or cl_args
179 184 self.ip = cfg['ip']
180 185 for channel in ('hb', 'shell', 'iopub', 'stdin'):
181 186 name = channel + '_port'
182 187 if getattr(self, name) == 0 and name in cfg:
183 188 # not overridden by config or cl_args
184 189 setattr(self, name, cfg[name])
185 190 if 'key' in cfg:
186 191 self.config.Session.key = str_to_bytes(cfg['key'])
187 192
188 193 def write_connection_file(self):
189 194 """write connection info to JSON file"""
190 195 if os.path.basename(self.connection_file) == self.connection_file:
191 196 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
192 197 else:
193 198 cf = self.connection_file
194 write_connection_file(cf, ip=self.ip, key=self.session.key,
199 write_connection_file(cf, ip=self.ip, key=self.session.key, transport=self.transport,
195 200 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
196 201 iopub_port=self.iopub_port)
197 202
198 203 self._full_connection_file = cf
199 204
200 205 def cleanup_connection_file(self):
201 206 cf = self._full_connection_file
202 207 self.log.debug("cleaning up connection file: %r", cf)
203 208 try:
204 209 os.remove(cf)
205 210 except (IOError, OSError):
206 211 pass
212
213 self._cleanup_ipc_files()
214
215 def _cleanup_ipc_files(self):
216 """cleanup ipc files if we wrote them"""
217 if self.transport != 'ipc':
218 return
219 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
220 ipcfile = "%s-%i" % (self.ip, port)
221 try:
222 os.remove(ipcfile)
223 except (IOError, OSError):
224 pass
207 225
208 226 def init_connection_file(self):
209 227 if not self.connection_file:
210 228 self.connection_file = "kernel-%s.json"%os.getpid()
211 229 try:
212 230 self.load_connection_file()
213 231 except Exception:
214 232 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
215 233 self.exit(1)
216 234
217 235 def init_sockets(self):
218 236 # Create a context, a session, and the kernel sockets.
219 237 self.log.info("Starting the kernel at pid: %i", os.getpid())
220 238 context = zmq.Context.instance()
221 239 # Uncomment this to try closing the context.
222 240 # atexit.register(context.term)
223 241
224 242 self.shell_socket = context.socket(zmq.ROUTER)
225 243 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
226 244 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
227 245
228 246 self.iopub_socket = context.socket(zmq.PUB)
229 247 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
230 248 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
231 249
232 250 self.stdin_socket = context.socket(zmq.ROUTER)
233 251 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
234 252 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
235 253
236 254 def init_heartbeat(self):
237 255 """start the heart beating"""
238 256 # heartbeat doesn't share context, because it mustn't be blocked
239 257 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
240 258 hb_ctx = zmq.Context()
241 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
259 self.heartbeat = Heartbeat(hb_ctx, (self.transport, self.ip, self.hb_port))
242 260 self.hb_port = self.heartbeat.port
243 261 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
244 262 self.heartbeat.start()
245 263
246 264 # Helper to make it easier to connect to an existing kernel.
247 265 # set log-level to critical, to make sure it is output
248 266 self.log.critical("To connect another client to this kernel, use:")
249 267
250 268 def log_connection_info(self):
251 269 """display connection info, and store ports"""
252 270 basename = os.path.basename(self.connection_file)
253 271 if basename == self.connection_file or \
254 272 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
255 273 # use shortname
256 274 tail = basename
257 275 if self.profile != 'default':
258 276 tail += " --profile %s" % self.profile
259 277 else:
260 278 tail = self.connection_file
261 279 self.log.critical("--existing %s", tail)
262 280
263 281
264 282 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
265 283 stdin=self.stdin_port, hb=self.hb_port)
266 284
267 285 def init_session(self):
268 286 """create our session object"""
269 287 default_secure(self.config)
270 288 self.session = Session(config=self.config, username=u'kernel')
271 289
272 290 def init_blackhole(self):
273 291 """redirects stdout/stderr to devnull if necessary"""
274 292 if self.no_stdout or self.no_stderr:
275 293 blackhole = open(os.devnull, 'w')
276 294 if self.no_stdout:
277 295 sys.stdout = sys.__stdout__ = blackhole
278 296 if self.no_stderr:
279 297 sys.stderr = sys.__stderr__ = blackhole
280 298
281 299 def init_io(self):
282 300 """Redirect input streams and set a display hook."""
283 301 if self.outstream_class:
284 302 outstream_factory = import_item(str(self.outstream_class))
285 303 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
286 304 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
287 305 if self.displayhook_class:
288 306 displayhook_factory = import_item(str(self.displayhook_class))
289 307 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
290 308
291 309 def init_signal(self):
292 310 signal.signal(signal.SIGINT, signal.SIG_IGN)
293 311
294 312 def init_kernel(self):
295 313 """Create the Kernel object itself"""
296 314 kernel_factory = import_item(str(self.kernel_class))
297 315 self.kernel = kernel_factory(config=self.config, session=self.session,
298 316 shell_socket=self.shell_socket,
299 317 iopub_socket=self.iopub_socket,
300 318 stdin_socket=self.stdin_socket,
301 319 log=self.log
302 320 )
303 321 self.kernel.record_ports(self.ports)
304 322
305 323 @catch_config_error
306 324 def initialize(self, argv=None):
307 325 super(KernelApp, self).initialize(argv)
308 326 self.init_blackhole()
309 327 self.init_connection_file()
310 328 self.init_session()
311 329 self.init_poller()
312 330 self.init_sockets()
313 331 self.init_heartbeat()
314 332 # writing/displaying connection info must be *after* init_sockets/heartbeat
315 333 self.log_connection_info()
316 334 self.write_connection_file()
317 335 self.init_io()
318 336 self.init_signal()
319 337 self.init_kernel()
320 338 # flush stdout/stderr, so that anything written to these streams during
321 339 # initialization do not get associated with the first execution request
322 340 sys.stdout.flush()
323 341 sys.stderr.flush()
324 342
325 343 def start(self):
326 344 if self.poller is not None:
327 345 self.poller.start()
328 346 self.kernel.start()
329 347 try:
330 348 ioloop.IOLoop.instance().start()
331 349 except KeyboardInterrupt:
332 350 pass
333 351
@@ -1,1012 +1,1042 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.loader import Config
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Constants and exceptions
48 48 #-----------------------------------------------------------------------------
49 49
50 50 class InvalidPortNumber(Exception):
51 51 pass
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Utility functions
55 55 #-----------------------------------------------------------------------------
56 56
57 57 # some utilities to validate message structure, these might get moved elsewhere
58 58 # if they prove to have more generic utility
59 59
60 60 def validate_string_list(lst):
61 61 """Validate that the input is a list of strings.
62 62
63 63 Raises ValueError if not."""
64 64 if not isinstance(lst, list):
65 65 raise ValueError('input %r must be a list' % lst)
66 66 for x in lst:
67 67 if not isinstance(x, basestring):
68 68 raise ValueError('element %r in list must be a string' % x)
69 69
70 70
71 71 def validate_string_dict(dct):
72 72 """Validate that the input is a dict with string keys and values.
73 73
74 74 Raises ValueError if not."""
75 75 for k,v in dct.iteritems():
76 76 if not isinstance(k, basestring):
77 77 raise ValueError('key %r in dict must be a string' % k)
78 78 if not isinstance(v, basestring):
79 79 raise ValueError('value %r in dict must be a string' % v)
80 80
81 81
82 82 #-----------------------------------------------------------------------------
83 83 # ZMQ Socket Channel classes
84 84 #-----------------------------------------------------------------------------
85 85
86 86 class ZMQSocketChannel(Thread):
87 87 """The base class for the channels that use ZMQ sockets.
88 88 """
89 89 context = None
90 90 session = None
91 91 socket = None
92 92 ioloop = None
93 93 stream = None
94 94 _address = None
95 95 _exiting = False
96 96
97 97 def __init__(self, context, session, address):
98 98 """Create a channel
99 99
100 100 Parameters
101 101 ----------
102 102 context : :class:`zmq.Context`
103 103 The ZMQ context to use.
104 104 session : :class:`session.Session`
105 105 The session to use.
106 address : tuple
106 address : zmq url
107 107 Standard (ip, port) tuple that the kernel is listening on.
108 108 """
109 109 super(ZMQSocketChannel, self).__init__()
110 110 self.daemon = True
111 111
112 112 self.context = context
113 113 self.session = session
114 if address[1] == 0:
115 message = 'The port number for a channel cannot be 0.'
116 raise InvalidPortNumber(message)
114 if isinstance(address, tuple):
115 if address[1] == 0:
116 message = 'The port number for a channel cannot be 0.'
117 raise InvalidPortNumber(message)
118 address = "tcp://%s:%i" % address
117 119 self._address = address
118 120 atexit.register(self._notice_exit)
119 121
120 122 def _notice_exit(self):
121 123 self._exiting = True
122 124
123 125 def _run_loop(self):
124 126 """Run my loop, ignoring EINTR events in the poller"""
125 127 while True:
126 128 try:
127 129 self.ioloop.start()
128 130 except ZMQError as e:
129 131 if e.errno == errno.EINTR:
130 132 continue
131 133 else:
132 134 raise
133 135 except Exception:
134 136 if self._exiting:
135 137 break
136 138 else:
137 139 raise
138 140 else:
139 141 break
140 142
141 143 def stop(self):
142 144 """Stop the channel's activity.
143 145
144 146 This calls :method:`Thread.join` and returns when the thread
145 147 terminates. :class:`RuntimeError` will be raised if
146 148 :method:`self.start` is called again.
147 149 """
148 150 self.join()
149 151
150 152 @property
151 153 def address(self):
152 """Get the channel's address as an (ip, port) tuple.
153
154 By the default, the address is (localhost, 0), where 0 means a random
155 port.
154 """Get the channel's address as a zmq url string ('tcp://127.0.0.1:5555').
156 155 """
157 156 return self._address
158 157
159 158 def _queue_send(self, msg):
160 159 """Queue a message to be sent from the IOLoop's thread.
161 160
162 161 Parameters
163 162 ----------
164 163 msg : message to send
165 164
166 165 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 166 thread control of the action.
168 167 """
169 168 def thread_send():
170 169 self.session.send(self.stream, msg)
171 170 self.ioloop.add_callback(thread_send)
172 171
173 172 def _handle_recv(self, msg):
174 173 """callback for stream.on_recv
175 174
176 175 unpacks message, and calls handlers with it.
177 176 """
178 177 ident,smsg = self.session.feed_identities(msg)
179 178 self.call_handlers(self.session.unserialize(smsg))
180 179
181 180
182 181
183 182 class ShellSocketChannel(ZMQSocketChannel):
184 183 """The DEALER channel for issues request/replies to the kernel.
185 184 """
186 185
187 186 command_queue = None
188 187 # flag for whether execute requests should be allowed to call raw_input:
189 188 allow_stdin = True
190 189
191 190 def __init__(self, context, session, address):
192 191 super(ShellSocketChannel, self).__init__(context, session, address)
193 192 self.ioloop = ioloop.IOLoop()
194 193
195 194 def run(self):
196 195 """The thread's main activity. Call start() instead."""
197 196 self.socket = self.context.socket(zmq.DEALER)
198 197 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 self.socket.connect('tcp://%s:%i' % self.address)
198 self.socket.connect(self.address)
200 199 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 200 self.stream.on_recv(self._handle_recv)
202 201 self._run_loop()
203 202 try:
204 203 self.socket.close()
205 204 except:
206 205 pass
207 206
208 207 def stop(self):
209 208 self.ioloop.stop()
210 209 super(ShellSocketChannel, self).stop()
211 210
212 211 def call_handlers(self, msg):
213 212 """This method is called in the ioloop thread when a message arrives.
214 213
215 214 Subclasses should override this method to handle incoming messages.
216 215 It is important to remember that this method is called in the thread
217 216 so that some logic must be done to ensure that the application leve
218 217 handlers are called in the application thread.
219 218 """
220 219 raise NotImplementedError('call_handlers must be defined in a subclass.')
221 220
222 221 def execute(self, code, silent=False, store_history=True,
223 222 user_variables=None, user_expressions=None, allow_stdin=None):
224 223 """Execute code in the kernel.
225 224
226 225 Parameters
227 226 ----------
228 227 code : str
229 228 A string of Python code.
230 229
231 230 silent : bool, optional (default False)
232 231 If set, the kernel will execute the code as quietly possible, and
233 232 will force store_history to be False.
234 233
235 234 store_history : bool, optional (default True)
236 235 If set, the kernel will store command history. This is forced
237 236 to be False if silent is True.
238 237
239 238 user_variables : list, optional
240 239 A list of variable names to pull from the user's namespace. They
241 240 will come back as a dict with these names as keys and their
242 241 :func:`repr` as values.
243 242
244 243 user_expressions : dict, optional
245 244 A dict mapping names to expressions to be evaluated in the user's
246 245 dict. The expression values are returned as strings formatted using
247 246 :func:`repr`.
248 247
249 248 allow_stdin : bool, optional (default self.allow_stdin)
250 249 Flag for whether the kernel can send stdin requests to frontends.
251 250
252 251 Some frontends (e.g. the Notebook) do not support stdin requests.
253 252 If raw_input is called from code executed from such a frontend, a
254 253 StdinNotImplementedError will be raised.
255 254
256 255 Returns
257 256 -------
258 257 The msg_id of the message sent.
259 258 """
260 259 if user_variables is None:
261 260 user_variables = []
262 261 if user_expressions is None:
263 262 user_expressions = {}
264 263 if allow_stdin is None:
265 264 allow_stdin = self.allow_stdin
266 265
267 266
268 267 # Don't waste network traffic if inputs are invalid
269 268 if not isinstance(code, basestring):
270 269 raise ValueError('code %r must be a string' % code)
271 270 validate_string_list(user_variables)
272 271 validate_string_dict(user_expressions)
273 272
274 273 # Create class for content/msg creation. Related to, but possibly
275 274 # not in Session.
276 275 content = dict(code=code, silent=silent, store_history=store_history,
277 276 user_variables=user_variables,
278 277 user_expressions=user_expressions,
279 278 allow_stdin=allow_stdin,
280 279 )
281 280 msg = self.session.msg('execute_request', content)
282 281 self._queue_send(msg)
283 282 return msg['header']['msg_id']
284 283
285 284 def complete(self, text, line, cursor_pos, block=None):
286 285 """Tab complete text in the kernel's namespace.
287 286
288 287 Parameters
289 288 ----------
290 289 text : str
291 290 The text to complete.
292 291 line : str
293 292 The full line of text that is the surrounding context for the
294 293 text to complete.
295 294 cursor_pos : int
296 295 The position of the cursor in the line where the completion was
297 296 requested.
298 297 block : str, optional
299 298 The full block of code in which the completion is being requested.
300 299
301 300 Returns
302 301 -------
303 302 The msg_id of the message sent.
304 303 """
305 304 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
306 305 msg = self.session.msg('complete_request', content)
307 306 self._queue_send(msg)
308 307 return msg['header']['msg_id']
309 308
310 309 def object_info(self, oname, detail_level=0):
311 310 """Get metadata information about an object.
312 311
313 312 Parameters
314 313 ----------
315 314 oname : str
316 315 A string specifying the object name.
317 316 detail_level : int, optional
318 317 The level of detail for the introspection (0-2)
319 318
320 319 Returns
321 320 -------
322 321 The msg_id of the message sent.
323 322 """
324 323 content = dict(oname=oname, detail_level=detail_level)
325 324 msg = self.session.msg('object_info_request', content)
326 325 self._queue_send(msg)
327 326 return msg['header']['msg_id']
328 327
329 328 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
330 329 """Get entries from the history list.
331 330
332 331 Parameters
333 332 ----------
334 333 raw : bool
335 334 If True, return the raw input.
336 335 output : bool
337 336 If True, then return the output as well.
338 337 hist_access_type : str
339 338 'range' (fill in session, start and stop params), 'tail' (fill in n)
340 339 or 'search' (fill in pattern param).
341 340
342 341 session : int
343 342 For a range request, the session from which to get lines. Session
344 343 numbers are positive integers; negative ones count back from the
345 344 current session.
346 345 start : int
347 346 The first line number of a history range.
348 347 stop : int
349 348 The final (excluded) line number of a history range.
350 349
351 350 n : int
352 351 The number of lines of history to get for a tail request.
353 352
354 353 pattern : str
355 354 The glob-syntax pattern for a search request.
356 355
357 356 Returns
358 357 -------
359 358 The msg_id of the message sent.
360 359 """
361 360 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
362 361 **kwargs)
363 362 msg = self.session.msg('history_request', content)
364 363 self._queue_send(msg)
365 364 return msg['header']['msg_id']
366 365
367 366 def shutdown(self, restart=False):
368 367 """Request an immediate kernel shutdown.
369 368
370 369 Upon receipt of the (empty) reply, client code can safely assume that
371 370 the kernel has shut down and it's safe to forcefully terminate it if
372 371 it's still alive.
373 372
374 373 The kernel will send the reply via a function registered with Python's
375 374 atexit module, ensuring it's truly done as the kernel is done with all
376 375 normal operation.
377 376 """
378 377 # Send quit message to kernel. Once we implement kernel-side setattr,
379 378 # this should probably be done that way, but for now this will do.
380 379 msg = self.session.msg('shutdown_request', {'restart':restart})
381 380 self._queue_send(msg)
382 381 return msg['header']['msg_id']
383 382
384 383
385 384
386 385 class SubSocketChannel(ZMQSocketChannel):
387 386 """The SUB channel which listens for messages that the kernel publishes.
388 387 """
389 388
390 389 def __init__(self, context, session, address):
391 390 super(SubSocketChannel, self).__init__(context, session, address)
392 391 self.ioloop = ioloop.IOLoop()
393 392
394 393 def run(self):
395 394 """The thread's main activity. Call start() instead."""
396 395 self.socket = self.context.socket(zmq.SUB)
397 396 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
398 397 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
399 self.socket.connect('tcp://%s:%i' % self.address)
398 self.socket.connect(self.address)
400 399 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
401 400 self.stream.on_recv(self._handle_recv)
402 401 self._run_loop()
403 402 try:
404 403 self.socket.close()
405 404 except:
406 405 pass
407 406
408 407 def stop(self):
409 408 self.ioloop.stop()
410 409 super(SubSocketChannel, self).stop()
411 410
412 411 def call_handlers(self, msg):
413 412 """This method is called in the ioloop thread when a message arrives.
414 413
415 414 Subclasses should override this method to handle incoming messages.
416 415 It is important to remember that this method is called in the thread
417 416 so that some logic must be done to ensure that the application leve
418 417 handlers are called in the application thread.
419 418 """
420 419 raise NotImplementedError('call_handlers must be defined in a subclass.')
421 420
422 421 def flush(self, timeout=1.0):
423 422 """Immediately processes all pending messages on the SUB channel.
424 423
425 424 Callers should use this method to ensure that :method:`call_handlers`
426 425 has been called for all messages that have been received on the
427 426 0MQ SUB socket of this channel.
428 427
429 428 This method is thread safe.
430 429
431 430 Parameters
432 431 ----------
433 432 timeout : float, optional
434 433 The maximum amount of time to spend flushing, in seconds. The
435 434 default is one second.
436 435 """
437 436 # We do the IOLoop callback process twice to ensure that the IOLoop
438 437 # gets to perform at least one full poll.
439 438 stop_time = time.time() + timeout
440 439 for i in xrange(2):
441 440 self._flushed = False
442 441 self.ioloop.add_callback(self._flush)
443 442 while not self._flushed and time.time() < stop_time:
444 443 time.sleep(0.01)
445 444
446 445 def _flush(self):
447 446 """Callback for :method:`self.flush`."""
448 447 self.stream.flush()
449 448 self._flushed = True
450 449
451 450
452 451 class StdInSocketChannel(ZMQSocketChannel):
453 452 """A reply channel to handle raw_input requests that the kernel makes."""
454 453
455 454 msg_queue = None
456 455
457 456 def __init__(self, context, session, address):
458 457 super(StdInSocketChannel, self).__init__(context, session, address)
459 458 self.ioloop = ioloop.IOLoop()
460 459
461 460 def run(self):
462 461 """The thread's main activity. Call start() instead."""
463 462 self.socket = self.context.socket(zmq.DEALER)
464 463 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
465 self.socket.connect('tcp://%s:%i' % self.address)
464 self.socket.connect(self.address)
466 465 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
467 466 self.stream.on_recv(self._handle_recv)
468 467 self._run_loop()
469 468 try:
470 469 self.socket.close()
471 470 except:
472 471 pass
473 472
474 473
475 474 def stop(self):
476 475 self.ioloop.stop()
477 476 super(StdInSocketChannel, self).stop()
478 477
479 478 def call_handlers(self, msg):
480 479 """This method is called in the ioloop thread when a message arrives.
481 480
482 481 Subclasses should override this method to handle incoming messages.
483 482 It is important to remember that this method is called in the thread
484 483 so that some logic must be done to ensure that the application leve
485 484 handlers are called in the application thread.
486 485 """
487 486 raise NotImplementedError('call_handlers must be defined in a subclass.')
488 487
489 488 def input(self, string):
490 489 """Send a string of raw input to the kernel."""
491 490 content = dict(value=string)
492 491 msg = self.session.msg('input_reply', content)
493 492 self._queue_send(msg)
494 493
495 494
496 495 class HBSocketChannel(ZMQSocketChannel):
497 496 """The heartbeat channel which monitors the kernel heartbeat.
498 497
499 498 Note that the heartbeat channel is paused by default. As long as you start
500 499 this channel, the kernel manager will ensure that it is paused and un-paused
501 500 as appropriate.
502 501 """
503 502
504 503 time_to_dead = 3.0
505 504 socket = None
506 505 poller = None
507 506 _running = None
508 507 _pause = None
509 508 _beating = None
510 509
511 510 def __init__(self, context, session, address):
512 511 super(HBSocketChannel, self).__init__(context, session, address)
513 512 self._running = False
514 513 self._pause =True
515 514 self.poller = zmq.Poller()
516 515
517 516 def _create_socket(self):
518 517 if self.socket is not None:
519 518 # close previous socket, before opening a new one
520 519 self.poller.unregister(self.socket)
521 520 self.socket.close()
522 521 self.socket = self.context.socket(zmq.REQ)
523 522 self.socket.setsockopt(zmq.LINGER, 0)
524 self.socket.connect('tcp://%s:%i' % self.address)
523 self.socket.connect(self.address)
525 524
526 525 self.poller.register(self.socket, zmq.POLLIN)
527 526
528 527 def _poll(self, start_time):
529 528 """poll for heartbeat replies until we reach self.time_to_dead
530 529
531 530 Ignores interrupts, and returns the result of poll(), which
532 531 will be an empty list if no messages arrived before the timeout,
533 532 or the event tuple if there is a message to receive.
534 533 """
535 534
536 535 until_dead = self.time_to_dead - (time.time() - start_time)
537 536 # ensure poll at least once
538 537 until_dead = max(until_dead, 1e-3)
539 538 events = []
540 539 while True:
541 540 try:
542 541 events = self.poller.poll(1000 * until_dead)
543 542 except ZMQError as e:
544 543 if e.errno == errno.EINTR:
545 544 # ignore interrupts during heartbeat
546 545 # this may never actually happen
547 546 until_dead = self.time_to_dead - (time.time() - start_time)
548 547 until_dead = max(until_dead, 1e-3)
549 548 pass
550 549 else:
551 550 raise
552 551 except Exception:
553 552 if self._exiting:
554 553 break
555 554 else:
556 555 raise
557 556 else:
558 557 break
559 558 return events
560 559
561 560 def run(self):
562 561 """The thread's main activity. Call start() instead."""
563 562 self._create_socket()
564 563 self._running = True
565 564 self._beating = True
566 565
567 566 while self._running:
568 567 if self._pause:
569 568 # just sleep, and skip the rest of the loop
570 569 time.sleep(self.time_to_dead)
571 570 continue
572 571
573 572 since_last_heartbeat = 0.0
574 573 # io.rprint('Ping from HB channel') # dbg
575 574 # no need to catch EFSM here, because the previous event was
576 575 # either a recv or connect, which cannot be followed by EFSM
577 576 self.socket.send(b'ping')
578 577 request_time = time.time()
579 578 ready = self._poll(request_time)
580 579 if ready:
581 580 self._beating = True
582 581 # the poll above guarantees we have something to recv
583 582 self.socket.recv()
584 583 # sleep the remainder of the cycle
585 584 remainder = self.time_to_dead - (time.time() - request_time)
586 585 if remainder > 0:
587 586 time.sleep(remainder)
588 587 continue
589 588 else:
590 589 # nothing was received within the time limit, signal heart failure
591 590 self._beating = False
592 591 since_last_heartbeat = time.time() - request_time
593 592 self.call_handlers(since_last_heartbeat)
594 593 # and close/reopen the socket, because the REQ/REP cycle has been broken
595 594 self._create_socket()
596 595 continue
597 596 try:
598 597 self.socket.close()
599 598 except:
600 599 pass
601 600
602 601 def pause(self):
603 602 """Pause the heartbeat."""
604 603 self._pause = True
605 604
606 605 def unpause(self):
607 606 """Unpause the heartbeat."""
608 607 self._pause = False
609 608
610 609 def is_beating(self):
611 610 """Is the heartbeat running and responsive (and not paused)."""
612 611 if self.is_alive() and not self._pause and self._beating:
613 612 return True
614 613 else:
615 614 return False
616 615
617 616 def stop(self):
618 617 self._running = False
619 618 super(HBSocketChannel, self).stop()
620 619
621 620 def call_handlers(self, since_last_heartbeat):
622 621 """This method is called in the ioloop thread when a message arrives.
623 622
624 623 Subclasses should override this method to handle incoming messages.
625 624 It is important to remember that this method is called in the thread
626 625 so that some logic must be done to ensure that the application level
627 626 handlers are called in the application thread.
628 627 """
629 628 raise NotImplementedError('call_handlers must be defined in a subclass.')
630 629
631 630
632 631 #-----------------------------------------------------------------------------
633 632 # Main kernel manager class
634 633 #-----------------------------------------------------------------------------
635 634
636 635 class KernelManager(HasTraits):
637 636 """ Manages a kernel for a frontend.
638 637
639 638 The SUB channel is for the frontend to receive messages published by the
640 639 kernel.
641 640
642 641 The REQ channel is for the frontend to make requests of the kernel.
643 642
644 643 The REP channel is for the kernel to request stdin (raw_input) from the
645 644 frontend.
646 645 """
647 646 # config object for passing to child configurables
648 647 config = Instance(Config)
649 648
650 649 # The PyZMQ Context to use for communication with the kernel.
651 650 context = Instance(zmq.Context)
652 651 def _context_default(self):
653 652 return zmq.Context.instance()
654 653
655 654 # The Session to use for communication with the kernel.
656 655 session = Instance(Session)
657 656
658 657 # The kernel process with which the KernelManager is communicating.
659 658 kernel = Instance(Popen)
660 659
661 660 # The addresses for the communication channels.
662 661 connection_file = Unicode('')
662
663 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp')
664
665
663 666 ip = Unicode(LOCALHOST)
664 667 def _ip_changed(self, name, old, new):
665 668 if new == '*':
666 669 self.ip = '0.0.0.0'
667 670 shell_port = Integer(0)
668 671 iopub_port = Integer(0)
669 672 stdin_port = Integer(0)
670 673 hb_port = Integer(0)
671 674
672 675 # The classes to use for the various channels.
673 676 shell_channel_class = Type(ShellSocketChannel)
674 677 sub_channel_class = Type(SubSocketChannel)
675 678 stdin_channel_class = Type(StdInSocketChannel)
676 679 hb_channel_class = Type(HBSocketChannel)
677 680
678 681 # Protected traits.
679 682 _launch_args = Any
680 683 _shell_channel = Any
681 684 _sub_channel = Any
682 685 _stdin_channel = Any
683 686 _hb_channel = Any
684 687 _connection_file_written=Bool(False)
685 688
686 689 def __init__(self, **kwargs):
687 690 super(KernelManager, self).__init__(**kwargs)
688 691 if self.session is None:
689 692 self.session = Session(config=self.config)
690 693
691 694 def __del__(self):
692 695 self.cleanup_connection_file()
693 696
694 697
695 698 #--------------------------------------------------------------------------
696 699 # Channel management methods:
697 700 #--------------------------------------------------------------------------
698 701
699 702 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
700 703 """Starts the channels for this kernel.
701 704
702 705 This will create the channels if they do not exist and then start
703 706 them. If port numbers of 0 are being used (random ports) then you
704 707 must first call :method:`start_kernel`. If the channels have been
705 708 stopped and you call this, :class:`RuntimeError` will be raised.
706 709 """
707 710 if shell:
708 711 self.shell_channel.start()
709 712 if sub:
710 713 self.sub_channel.start()
711 714 if stdin:
712 715 self.stdin_channel.start()
713 716 self.shell_channel.allow_stdin = True
714 717 else:
715 718 self.shell_channel.allow_stdin = False
716 719 if hb:
717 720 self.hb_channel.start()
718 721
719 722 def stop_channels(self):
720 723 """Stops all the running channels for this kernel.
721 724 """
722 725 if self.shell_channel.is_alive():
723 726 self.shell_channel.stop()
724 727 if self.sub_channel.is_alive():
725 728 self.sub_channel.stop()
726 729 if self.stdin_channel.is_alive():
727 730 self.stdin_channel.stop()
728 731 if self.hb_channel.is_alive():
729 732 self.hb_channel.stop()
730 733
731 734 @property
732 735 def channels_running(self):
733 736 """Are any of the channels created and running?"""
734 737 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
735 738 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
736 739
737 740 #--------------------------------------------------------------------------
738 741 # Kernel process management methods:
739 742 #--------------------------------------------------------------------------
740 743
741 744 def cleanup_connection_file(self):
742 745 """cleanup connection file *if we wrote it*
743 746
744 747 Will not raise if the connection file was already removed somehow.
745 748 """
746 749 if self._connection_file_written:
747 750 # cleanup connection files on full shutdown of kernel we started
748 751 self._connection_file_written = False
749 752 try:
750 753 os.remove(self.connection_file)
751 except OSError:
754 except (IOError, OSError):
755 pass
756
757 self._cleanup_ipc_files()
758
759 def _cleanup_ipc_files(self):
760 """cleanup ipc files if we wrote them"""
761 if self.transport != 'ipc':
762 return
763 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
764 ipcfile = "%s-%i" % (self.ip, port)
765 try:
766 os.remove(ipcfile)
767 except (IOError, OSError):
752 768 pass
753 769
754 770 def load_connection_file(self):
755 771 """load connection info from JSON dict in self.connection_file"""
756 772 with open(self.connection_file) as f:
757 773 cfg = json.loads(f.read())
758 774
775 from pprint import pprint
776 pprint(cfg)
777 self.transport = cfg.get('transport', 'tcp')
759 778 self.ip = cfg['ip']
760 779 self.shell_port = cfg['shell_port']
761 780 self.stdin_port = cfg['stdin_port']
762 781 self.iopub_port = cfg['iopub_port']
763 782 self.hb_port = cfg['hb_port']
764 783 self.session.key = str_to_bytes(cfg['key'])
765 784
766 785 def write_connection_file(self):
767 786 """write connection info to JSON dict in self.connection_file"""
768 787 if self._connection_file_written:
769 788 return
770 789 self.connection_file,cfg = write_connection_file(self.connection_file,
771 ip=self.ip, key=self.session.key,
790 transport=self.transport, ip=self.ip, key=self.session.key,
772 791 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
773 792 shell_port=self.shell_port, hb_port=self.hb_port)
774 793 # write_connection_file also sets default ports:
775 794 self.shell_port = cfg['shell_port']
776 795 self.stdin_port = cfg['stdin_port']
777 796 self.iopub_port = cfg['iopub_port']
778 797 self.hb_port = cfg['hb_port']
779 798
780 799 self._connection_file_written = True
781 800
782 801 def start_kernel(self, **kw):
783 802 """Starts a kernel process and configures the manager to use it.
784 803
785 804 If random ports (port=0) are being used, this method must be called
786 805 before the channels are created.
787 806
788 807 Parameters:
789 808 -----------
790 809 launcher : callable, optional (default None)
791 810 A custom function for launching the kernel process (generally a
792 811 wrapper around ``entry_point.base_launch_kernel``). In most cases,
793 812 it should not be necessary to use this parameter.
794 813
795 814 **kw : optional
796 815 See respective options for IPython and Python kernels.
797 816 """
798 if self.ip not in LOCAL_IPS:
817 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
799 818 raise RuntimeError("Can only launch a kernel on a local interface. "
800 819 "Make sure that the '*_address' attributes are "
801 820 "configured properly. "
802 821 "Currently valid addresses are: %s"%LOCAL_IPS
803 822 )
804 823
805 824 # write connection file / get default ports
806 825 self.write_connection_file()
807 826
808 827 self._launch_args = kw.copy()
809 828 launch_kernel = kw.pop('launcher', None)
810 829 if launch_kernel is None:
811 830 from ipkernel import launch_kernel
812 831 self.kernel = launch_kernel(fname=self.connection_file, **kw)
813 832
814 833 def shutdown_kernel(self, restart=False):
815 834 """ Attempts to the stop the kernel process cleanly.
816 835
817 836 If the kernel cannot be stopped and the kernel is local, it is killed.
818 837 """
819 838 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
820 839 if sys.platform == 'win32':
821 840 self.kill_kernel()
822 841 return
823 842
824 843 # Pause the heart beat channel if it exists.
825 844 if self._hb_channel is not None:
826 845 self._hb_channel.pause()
827 846
828 847 # Don't send any additional kernel kill messages immediately, to give
829 848 # the kernel a chance to properly execute shutdown actions. Wait for at
830 849 # most 1s, checking every 0.1s.
831 850 self.shell_channel.shutdown(restart=restart)
832 851 for i in range(10):
833 852 if self.is_alive:
834 853 time.sleep(0.1)
835 854 else:
836 855 break
837 856 else:
838 857 # OK, we've waited long enough.
839 858 if self.has_kernel:
840 859 self.kill_kernel()
841 860
842 861 if not restart and self._connection_file_written:
843 862 # cleanup connection files on full shutdown of kernel we started
844 863 self._connection_file_written = False
845 864 try:
846 865 os.remove(self.connection_file)
847 866 except IOError:
848 867 pass
849 868
850 869 def restart_kernel(self, now=False, **kw):
851 870 """Restarts a kernel with the arguments that were used to launch it.
852 871
853 872 If the old kernel was launched with random ports, the same ports will be
854 873 used for the new kernel.
855 874
856 875 Parameters
857 876 ----------
858 877 now : bool, optional
859 878 If True, the kernel is forcefully restarted *immediately*, without
860 879 having a chance to do any cleanup action. Otherwise the kernel is
861 880 given 1s to clean up before a forceful restart is issued.
862 881
863 882 In all cases the kernel is restarted, the only difference is whether
864 883 it is given a chance to perform a clean shutdown or not.
865 884
866 885 **kw : optional
867 886 Any options specified here will replace those used to launch the
868 887 kernel.
869 888 """
870 889 if self._launch_args is None:
871 890 raise RuntimeError("Cannot restart the kernel. "
872 891 "No previous call to 'start_kernel'.")
873 892 else:
874 893 # Stop currently running kernel.
875 894 if self.has_kernel:
876 895 if now:
877 896 self.kill_kernel()
878 897 else:
879 898 self.shutdown_kernel(restart=True)
880 899
881 900 # Start new kernel.
882 901 self._launch_args.update(kw)
883 902 self.start_kernel(**self._launch_args)
884 903
885 904 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
886 905 # unless there is some delay here.
887 906 if sys.platform == 'win32':
888 907 time.sleep(0.2)
889 908
890 909 @property
891 910 def has_kernel(self):
892 911 """Returns whether a kernel process has been specified for the kernel
893 912 manager.
894 913 """
895 914 return self.kernel is not None
896 915
897 916 def kill_kernel(self):
898 917 """ Kill the running kernel.
899 918
900 919 This method blocks until the kernel process has terminated.
901 920 """
902 921 if self.has_kernel:
903 922 # Pause the heart beat channel if it exists.
904 923 if self._hb_channel is not None:
905 924 self._hb_channel.pause()
906 925
907 926 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
908 927 # TerminateProcess() on Win32).
909 928 try:
910 929 self.kernel.kill()
911 930 except OSError as e:
912 931 # In Windows, we will get an Access Denied error if the process
913 932 # has already terminated. Ignore it.
914 933 if sys.platform == 'win32':
915 934 if e.winerror != 5:
916 935 raise
917 936 # On Unix, we may get an ESRCH error if the process has already
918 937 # terminated. Ignore it.
919 938 else:
920 939 from errno import ESRCH
921 940 if e.errno != ESRCH:
922 941 raise
923 942
924 943 # Block until the kernel terminates.
925 944 self.kernel.wait()
926 945 self.kernel = None
927 946 else:
928 947 raise RuntimeError("Cannot kill kernel. No kernel is running!")
929 948
930 949 def interrupt_kernel(self):
931 950 """ Interrupts the kernel.
932 951
933 952 Unlike ``signal_kernel``, this operation is well supported on all
934 953 platforms.
935 954 """
936 955 if self.has_kernel:
937 956 if sys.platform == 'win32':
938 957 from parentpoller import ParentPollerWindows as Poller
939 958 Poller.send_interrupt(self.kernel.win32_interrupt_event)
940 959 else:
941 960 self.kernel.send_signal(signal.SIGINT)
942 961 else:
943 962 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
944 963
945 964 def signal_kernel(self, signum):
946 965 """ Sends a signal to the kernel.
947 966
948 967 Note that since only SIGTERM is supported on Windows, this function is
949 968 only useful on Unix systems.
950 969 """
951 970 if self.has_kernel:
952 971 self.kernel.send_signal(signum)
953 972 else:
954 973 raise RuntimeError("Cannot signal kernel. No kernel is running!")
955 974
956 975 @property
957 976 def is_alive(self):
958 977 """Is the kernel process still running?"""
959 978 if self.has_kernel:
960 979 if self.kernel.poll() is None:
961 980 return True
962 981 else:
963 982 return False
964 983 elif self._hb_channel is not None:
965 984 # We didn't start the kernel with this KernelManager so we
966 985 # use the heartbeat.
967 986 return self._hb_channel.is_beating()
968 987 else:
969 988 # no heartbeat and not local, we can't tell if it's running,
970 989 # so naively return True
971 990 return True
972 991
973 992 #--------------------------------------------------------------------------
974 993 # Channels used for communication with the kernel:
975 994 #--------------------------------------------------------------------------
976 995
996 def _make_url(self, port):
997 """make a zmq url with a port"""
998 if self.transport == 'tcp':
999 return "tcp://%s:%i" % (self.ip, port)
1000 else:
1001 return "%s://%s-%s" % (self.transport, self.ip, port)
1002
977 1003 @property
978 1004 def shell_channel(self):
979 1005 """Get the REQ socket channel object to make requests of the kernel."""
980 1006 if self._shell_channel is None:
981 1007 self._shell_channel = self.shell_channel_class(self.context,
982 self.session,
983 (self.ip, self.shell_port))
1008 self.session,
1009 self._make_url(self.shell_port),
1010 )
984 1011 return self._shell_channel
985 1012
986 1013 @property
987 1014 def sub_channel(self):
988 1015 """Get the SUB socket channel object."""
989 1016 if self._sub_channel is None:
990 1017 self._sub_channel = self.sub_channel_class(self.context,
991 1018 self.session,
992 (self.ip, self.iopub_port))
1019 self._make_url(self.iopub_port),
1020 )
993 1021 return self._sub_channel
994 1022
995 1023 @property
996 1024 def stdin_channel(self):
997 1025 """Get the REP socket channel object to handle stdin (raw_input)."""
998 1026 if self._stdin_channel is None:
999 1027 self._stdin_channel = self.stdin_channel_class(self.context,
1000 self.session,
1001 (self.ip, self.stdin_port))
1028 self.session,
1029 self._make_url(self.stdin_port),
1030 )
1002 1031 return self._stdin_channel
1003 1032
1004 1033 @property
1005 1034 def hb_channel(self):
1006 1035 """Get the heartbeat socket channel object to check that the
1007 1036 kernel is alive."""
1008 1037 if self._hb_channel is None:
1009 1038 self._hb_channel = self.hb_channel_class(self.context,
1010 self.session,
1011 (self.ip, self.hb_port))
1039 self.session,
1040 self._make_url(self.hb_port),
1041 )
1012 1042 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now