##// END OF EJS Templates
enable IPC transport for kernels...
MinRK -
Show More
@@ -1,356 +1,378 b''
1 1 """ A minimal application base mixin for all ZMQ based IPython frontends.
2 2
3 3 This is not a complete console app, as subprocess will not be able to receive
4 4 input, there is no real readline support, among other limitations. This is a
5 5 refactoring of what used to be the IPython/frontend/qt/console/qtconsoleapp.py
6 6
7 7 Authors:
8 8
9 9 * Evan Patterson
10 10 * Min RK
11 11 * Erik Tollerud
12 12 * Fernando Perez
13 13 * Bussonnier Matthias
14 14 * Thomas Kluyver
15 15 * Paul Ivanov
16 16
17 17 """
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 # stdlib imports
24 24 import atexit
25 25 import json
26 26 import os
27 import shutil
27 28 import signal
28 29 import sys
29 30 import uuid
30 31
31 32
32 33 # Local imports
33 34 from IPython.config.application import boolean_flag
34 35 from IPython.config.configurable import Configurable
35 36 from IPython.core.profiledir import ProfileDir
36 37 from IPython.lib.kernel import tunnel_to_kernel, find_connection_file, swallow_argv
37 38 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
38 39 from IPython.utils.path import filefind
39 40 from IPython.utils.py3compat import str_to_bytes
40 41 from IPython.utils.traitlets import (
41 Dict, List, Unicode, CUnicode, Int, CBool, Any
42 Dict, List, Unicode, CUnicode, Int, CBool, Any, CaselessStrEnum
42 43 )
43 44 from IPython.zmq.ipkernel import (
44 45 flags as ipkernel_flags,
45 46 aliases as ipkernel_aliases,
46 47 IPKernelApp
47 48 )
48 49 from IPython.zmq.session import Session, default_secure
49 50 from IPython.zmq.zmqshell import ZMQInteractiveShell
50 51
51 52 #-----------------------------------------------------------------------------
52 53 # Network Constants
53 54 #-----------------------------------------------------------------------------
54 55
55 56 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
56 57
57 58 #-----------------------------------------------------------------------------
58 59 # Globals
59 60 #-----------------------------------------------------------------------------
60 61
61 62
62 63 #-----------------------------------------------------------------------------
63 64 # Aliases and Flags
64 65 #-----------------------------------------------------------------------------
65 66
66 67 flags = dict(ipkernel_flags)
67 68
68 69 # the flags that are specific to the frontend
69 70 # these must be scrubbed before being passed to the kernel,
70 71 # or it will raise an error on unrecognized flags
71 72 app_flags = {
72 73 'existing' : ({'IPythonConsoleApp' : {'existing' : 'kernel*.json'}},
73 74 "Connect to an existing kernel. If no argument specified, guess most recent"),
74 75 }
75 76 app_flags.update(boolean_flag(
76 77 'confirm-exit', 'IPythonConsoleApp.confirm_exit',
77 78 """Set to display confirmation dialog on exit. You can always use 'exit' or 'quit',
78 79 to force a direct exit without any confirmation.
79 80 """,
80 81 """Don't prompt the user when exiting. This will terminate the kernel
81 82 if it is owned by the frontend, and leave it alive if it is external.
82 83 """
83 84 ))
84 85 flags.update(app_flags)
85 86
86 87 aliases = dict(ipkernel_aliases)
87 88
88 89 # also scrub aliases from the frontend
89 90 app_aliases = dict(
90 91 hb = 'IPythonConsoleApp.hb_port',
91 92 shell = 'IPythonConsoleApp.shell_port',
92 93 iopub = 'IPythonConsoleApp.iopub_port',
93 94 stdin = 'IPythonConsoleApp.stdin_port',
94 95 ip = 'IPythonConsoleApp.ip',
95 96 existing = 'IPythonConsoleApp.existing',
96 97 f = 'IPythonConsoleApp.connection_file',
97 98
98 99
99 100 ssh = 'IPythonConsoleApp.sshserver',
100 101 )
101 102 aliases.update(app_aliases)
102 103
103 104 #-----------------------------------------------------------------------------
104 105 # Classes
105 106 #-----------------------------------------------------------------------------
106 107
107 108 #-----------------------------------------------------------------------------
108 109 # IPythonConsole
109 110 #-----------------------------------------------------------------------------
110 111
111 112 classes = [IPKernelApp, ZMQInteractiveShell, ProfileDir, Session]
112 113
113 114 try:
114 115 from IPython.zmq.pylab.backend_inline import InlineBackend
115 116 except ImportError:
116 117 pass
117 118 else:
118 119 classes.append(InlineBackend)
119 120
120 121 class IPythonConsoleApp(Configurable):
121 122 name = 'ipython-console-mixin'
122 123 default_config_file_name='ipython_config.py'
123 124
124 125 description = """
125 126 The IPython Mixin Console.
126 127
127 128 This class contains the common portions of console client (QtConsole,
128 129 ZMQ-based terminal console, etc). It is not a full console, in that
129 130 launched terminal subprocesses will not be able to accept input.
130 131
131 132 The Console using this mixing supports various extra features beyond
132 133 the single-process Terminal IPython shell, such as connecting to
133 134 existing kernel, via:
134 135
135 136 ipython <appname> --existing
136 137
137 138 as well as tunnel via SSH
138 139
139 140 """
140 141
141 142 classes = classes
142 143 flags = Dict(flags)
143 144 aliases = Dict(aliases)
144 145 kernel_manager_class = BlockingKernelManager
145 146
146 147 kernel_argv = List(Unicode)
147 148 # frontend flags&aliases to be stripped when building kernel_argv
148 149 frontend_flags = Any(app_flags)
149 150 frontend_aliases = Any(app_aliases)
150 151
151 152 # create requested profiles by default, if they don't exist:
152 153 auto_create = CBool(True)
153 154 # connection info:
154 ip = Unicode(LOCALHOST, config=True,
155
156 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
157
158 ip = Unicode(config=True,
155 159 help="""Set the kernel\'s IP address [default localhost].
156 160 If the IP address is something other than localhost, then
157 161 Consoles on other machines will be able to connect
158 162 to the Kernel, so be careful!"""
159 163 )
164 def _ip_default(self):
165 if self.transport == 'tcp':
166 return LOCALHOST
167 else:
168 # this can fire early if ip is given,
169 # in which case our return value is meaningless
170 if not hasattr(self, 'profile_dir'):
171 return ''
172 ipcdir = os.path.join(self.profile_dir.security_dir, 'kernel-%s' % os.getpid())
173 os.makedirs(ipcdir)
174 atexit.register(lambda : shutil.rmtree(ipcdir))
175 return os.path.join(ipcdir, 'ipc')
160 176
161 177 sshserver = Unicode('', config=True,
162 178 help="""The SSH server to use to connect to the kernel.""")
163 179 sshkey = Unicode('', config=True,
164 180 help="""Path to the ssh key to use for logging in to the ssh server.""")
165 181
166 182 hb_port = Int(0, config=True,
167 183 help="set the heartbeat port [default: random]")
168 184 shell_port = Int(0, config=True,
169 help="set the shell (XREP) port [default: random]")
185 help="set the shell (ROUTER) port [default: random]")
170 186 iopub_port = Int(0, config=True,
171 187 help="set the iopub (PUB) port [default: random]")
172 188 stdin_port = Int(0, config=True,
173 help="set the stdin (XREQ) port [default: random]")
189 help="set the stdin (DEALER) port [default: random]")
174 190 connection_file = Unicode('', config=True,
175 191 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
176 192
177 193 This file will contain the IP, ports, and authentication key needed to connect
178 194 clients to this kernel. By default, this file will be created in the security-dir
179 195 of the current profile, but can be specified by absolute path.
180 196 """)
181 197 def _connection_file_default(self):
182 198 return 'kernel-%i.json' % os.getpid()
183 199
184 200 existing = CUnicode('', config=True,
185 201 help="""Connect to an already running kernel""")
186 202
187 203 confirm_exit = CBool(True, config=True,
188 204 help="""
189 205 Set to display confirmation dialog on exit. You can always use 'exit' or 'quit',
190 206 to force a direct exit without any confirmation.""",
191 207 )
192 208
193 209
194 210 def build_kernel_argv(self, argv=None):
195 211 """build argv to be passed to kernel subprocess"""
196 212 if argv is None:
197 213 argv = sys.argv[1:]
198 214 self.kernel_argv = swallow_argv(argv, self.frontend_aliases, self.frontend_flags)
199 215 # kernel should inherit default config file from frontend
200 216 self.kernel_argv.append("--KernelApp.parent_appname='%s'"%self.name)
201 217
202 218 def init_connection_file(self):
203 219 """find the connection file, and load the info if found.
204 220
205 221 The current working directory and the current profile's security
206 222 directory will be searched for the file if it is not given by
207 223 absolute path.
208 224
209 225 When attempting to connect to an existing kernel and the `--existing`
210 226 argument does not match an existing file, it will be interpreted as a
211 227 fileglob, and the matching file in the current profile's security dir
212 228 with the latest access time will be used.
213 229
214 230 After this method is called, self.connection_file contains the *full path*
215 231 to the connection file, never just its name.
216 232 """
217 233 if self.existing:
218 234 try:
219 235 cf = find_connection_file(self.existing)
220 236 except Exception:
221 237 self.log.critical("Could not find existing kernel connection file %s", self.existing)
222 238 self.exit(1)
223 239 self.log.info("Connecting to existing kernel: %s" % cf)
224 240 self.connection_file = cf
225 241 else:
226 242 # not existing, check if we are going to write the file
227 243 # and ensure that self.connection_file is a full path, not just the shortname
228 244 try:
229 245 cf = find_connection_file(self.connection_file)
230 246 except Exception:
231 247 # file might not exist
232 248 if self.connection_file == os.path.basename(self.connection_file):
233 249 # just shortname, put it in security dir
234 250 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
235 251 else:
236 252 cf = self.connection_file
237 253 self.connection_file = cf
238 254
239 255 # should load_connection_file only be used for existing?
240 256 # as it is now, this allows reusing ports if an existing
241 257 # file is requested
242 258 try:
243 259 self.load_connection_file()
244 260 except Exception:
245 261 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
246 262 self.exit(1)
247 263
248 264 def load_connection_file(self):
249 265 """load ip/port/hmac config from JSON connection file"""
250 266 # this is identical to KernelApp.load_connection_file
251 267 # perhaps it can be centralized somewhere?
252 268 try:
253 269 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
254 270 except IOError:
255 271 self.log.debug("Connection File not found: %s", self.connection_file)
256 272 return
257 273 self.log.debug(u"Loading connection file %s", fname)
258 274 with open(fname) as f:
259 s = f.read()
260 cfg = json.loads(s)
261 if self.ip == LOCALHOST and 'ip' in cfg:
262 # not overridden by config or cl_args
275 cfg = json.load(f)
276
277 self.transport = cfg.get('transport', 'tcp')
278 if 'ip' in cfg:
263 279 self.ip = cfg['ip']
264 280 for channel in ('hb', 'shell', 'iopub', 'stdin'):
265 281 name = channel + '_port'
266 282 if getattr(self, name) == 0 and name in cfg:
267 283 # not overridden by config or cl_args
268 284 setattr(self, name, cfg[name])
269 285 if 'key' in cfg:
270 286 self.config.Session.key = str_to_bytes(cfg['key'])
287
271 288
272 289 def init_ssh(self):
273 290 """set up ssh tunnels, if needed."""
274 291 if not self.sshserver and not self.sshkey:
275 292 return
276 293
294 if self.transport != 'tcp':
295 self.log.error("Can only use ssh tunnels with TCP sockets, not %s", self.transport)
296 return
297
277 298 if self.sshkey and not self.sshserver:
278 299 # specifying just the key implies that we are connecting directly
279 300 self.sshserver = self.ip
280 301 self.ip = LOCALHOST
281 302
282 303 # build connection dict for tunnels:
283 304 info = dict(ip=self.ip,
284 305 shell_port=self.shell_port,
285 306 iopub_port=self.iopub_port,
286 307 stdin_port=self.stdin_port,
287 308 hb_port=self.hb_port
288 309 )
289 310
290 311 self.log.info("Forwarding connections to %s via %s"%(self.ip, self.sshserver))
291 312
292 313 # tunnels return a new set of ports, which will be on localhost:
293 314 self.ip = LOCALHOST
294 315 try:
295 316 newports = tunnel_to_kernel(info, self.sshserver, self.sshkey)
296 317 except:
297 318 # even catch KeyboardInterrupt
298 319 self.log.error("Could not setup tunnels", exc_info=True)
299 320 self.exit(1)
300 321
301 322 self.shell_port, self.iopub_port, self.stdin_port, self.hb_port = newports
302 323
303 324 cf = self.connection_file
304 325 base,ext = os.path.splitext(cf)
305 326 base = os.path.basename(base)
306 327 self.connection_file = os.path.basename(base)+'-ssh'+ext
307 328 self.log.critical("To connect another client via this tunnel, use:")
308 329 self.log.critical("--existing %s" % self.connection_file)
309 330
310 331 def _new_connection_file(self):
311 332 cf = ''
312 333 while not cf:
313 334 # we don't need a 128b id to distinguish kernels, use more readable
314 335 # 48b node segment (12 hex chars). Users running more than 32k simultaneous
315 336 # kernels can subclass.
316 337 ident = str(uuid.uuid4()).split('-')[-1]
317 338 cf = os.path.join(self.profile_dir.security_dir, 'kernel-%s.json' % ident)
318 339 # only keep if it's actually new. Protect against unlikely collision
319 340 # in 48b random search space
320 341 cf = cf if not os.path.exists(cf) else ''
321 342 return cf
322 343
323 344 def init_kernel_manager(self):
324 345 # Don't let Qt or ZMQ swallow KeyboardInterupts.
325 346 signal.signal(signal.SIGINT, signal.SIG_DFL)
326 347
327 348 # Create a KernelManager and start a kernel.
328 349 self.kernel_manager = self.kernel_manager_class(
350 transport=self.transport,
329 351 ip=self.ip,
330 352 shell_port=self.shell_port,
331 353 iopub_port=self.iopub_port,
332 354 stdin_port=self.stdin_port,
333 355 hb_port=self.hb_port,
334 356 connection_file=self.connection_file,
335 357 config=self.config,
336 358 )
337 359 # start the kernel
338 360 if not self.existing:
339 361 self.kernel_manager.start_kernel(extra_arguments=self.kernel_argv)
340 362 elif self.sshserver:
341 363 # ssh, write new connection file
342 364 self.kernel_manager.write_connection_file()
343 365 atexit.register(self.kernel_manager.cleanup_connection_file)
344 366 self.kernel_manager.start_channels()
345 367
346 368
347 369 def initialize(self, argv=None):
348 370 """
349 371 Classes which mix this class in should call:
350 372 IPythonConsoleApp.initialize(self,argv)
351 373 """
352 374 self.init_connection_file()
353 375 default_secure(self.config)
354 376 self.init_ssh()
355 377 self.init_kernel_manager()
356 378
@@ -1,208 +1,218 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3 """
4 4
5 5 # Standard library imports.
6 6 import atexit
7 7 import json
8 8 import os
9 9 import socket
10 10 from subprocess import Popen, PIPE
11 11 import sys
12 12 import tempfile
13 13
14 14 # System library imports
15 15
16 16 # IPython imports
17 17 from IPython.utils.localinterfaces import LOCALHOST
18 18 from IPython.utils.py3compat import bytes_to_str
19 19
20 20 # Local imports.
21 21 from parentpoller import ParentPollerWindows
22 22
23 23 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
24 ip=LOCALHOST, key=b''):
24 ip=LOCALHOST, key=b'', transport='tcp'):
25 25 """Generates a JSON config file, including the selection of random ports.
26 26
27 27 Parameters
28 28 ----------
29 29
30 30 fname : unicode
31 31 The path to the file to write
32 32
33 33 shell_port : int, optional
34 34 The port to use for XREP channel.
35 35
36 36 iopub_port : int, optional
37 37 The port to use for the SUB channel.
38 38
39 39 stdin_port : int, optional
40 40 The port to use for the REQ (raw input) channel.
41 41
42 42 hb_port : int, optional
43 43 The port to use for the hearbeat REP channel.
44 44
45 45 ip : str, optional
46 46 The ip address the kernel will bind to.
47 47
48 48 key : str, optional
49 49 The Session key used for HMAC authentication.
50 50
51 51 """
52 52 # default to temporary connector file
53 53 if not fname:
54 54 fname = tempfile.mktemp('.json')
55 55
56 56 # Find open ports as necessary.
57
57 58 ports = []
58 59 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
59 60 int(stdin_port <= 0) + int(hb_port <= 0)
60 for i in xrange(ports_needed):
61 sock = socket.socket()
62 sock.bind(('', 0))
63 ports.append(sock)
64 for i, sock in enumerate(ports):
65 port = sock.getsockname()[1]
66 sock.close()
67 ports[i] = port
61 if transport == 'tcp':
62 for i in range(ports_needed):
63 sock = socket.socket()
64 sock.bind(('', 0))
65 ports.append(sock)
66 for i, sock in enumerate(ports):
67 port = sock.getsockname()[1]
68 sock.close()
69 ports[i] = port
70 else:
71 N = 1
72 for i in range(ports_needed):
73 while os.path.exists("%s-%s" % (ip, str(N))):
74 N += 1
75 ports.append(N)
76 N += 1
68 77 if shell_port <= 0:
69 78 shell_port = ports.pop(0)
70 79 if iopub_port <= 0:
71 80 iopub_port = ports.pop(0)
72 81 if stdin_port <= 0:
73 82 stdin_port = ports.pop(0)
74 83 if hb_port <= 0:
75 84 hb_port = ports.pop(0)
76 85
77 86 cfg = dict( shell_port=shell_port,
78 87 iopub_port=iopub_port,
79 88 stdin_port=stdin_port,
80 89 hb_port=hb_port,
81 90 )
82 91 cfg['ip'] = ip
83 92 cfg['key'] = bytes_to_str(key)
93 cfg['transport'] = transport
84 94
85 95 with open(fname, 'w') as f:
86 96 f.write(json.dumps(cfg, indent=2))
87 97
88 98 return fname, cfg
89 99
90 100
91 101 def base_launch_kernel(code, fname, stdin=None, stdout=None, stderr=None,
92 102 executable=None, independent=False, extra_arguments=[]):
93 103 """ Launches a localhost kernel, binding to the specified ports.
94 104
95 105 Parameters
96 106 ----------
97 107 code : str,
98 108 A string of Python code that imports and executes a kernel entry point.
99 109
100 110 stdin, stdout, stderr : optional (default None)
101 111 Standards streams, as defined in subprocess.Popen.
102 112
103 113 fname : unicode, optional
104 114 The JSON connector file, containing ip/port/hmac key information.
105 115
106 116 key : str, optional
107 117 The Session key used for HMAC authentication.
108 118
109 119 executable : str, optional (default sys.executable)
110 120 The Python executable to use for the kernel process.
111 121
112 122 independent : bool, optional (default False)
113 123 If set, the kernel process is guaranteed to survive if this process
114 124 dies. If not set, an effort is made to ensure that the kernel is killed
115 125 when this process dies. Note that in this case it is still good practice
116 126 to kill kernels manually before exiting.
117 127
118 128 extra_arguments = list, optional
119 129 A list of extra arguments to pass when executing the launch code.
120 130
121 131 Returns
122 132 -------
123 133 A tuple of form:
124 134 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
125 135 where kernel_process is a Popen object and the ports are integers.
126 136 """
127 137
128 138 # Build the kernel launch command.
129 139 if executable is None:
130 140 executable = sys.executable
131 141 arguments = [ executable, '-c', code, '-f', fname ]
132 142 arguments.extend(extra_arguments)
133 143
134 144 # Popen will fail (sometimes with a deadlock) if stdin, stdout, and stderr
135 145 # are invalid. Unfortunately, there is in general no way to detect whether
136 146 # they are valid. The following two blocks redirect them to (temporary)
137 147 # pipes in certain important cases.
138 148
139 149 # If this process has been backgrounded, our stdin is invalid. Since there
140 150 # is no compelling reason for the kernel to inherit our stdin anyway, we'll
141 151 # place this one safe and always redirect.
142 152 redirect_in = True
143 153 _stdin = PIPE if stdin is None else stdin
144 154
145 155 # If this process in running on pythonw, we know that stdin, stdout, and
146 156 # stderr are all invalid.
147 157 redirect_out = sys.executable.endswith('pythonw.exe')
148 158 if redirect_out:
149 159 _stdout = PIPE if stdout is None else stdout
150 160 _stderr = PIPE if stderr is None else stderr
151 161 else:
152 162 _stdout, _stderr = stdout, stderr
153 163
154 164 # Spawn a kernel.
155 165 if sys.platform == 'win32':
156 166 # Create a Win32 event for interrupting the kernel.
157 167 interrupt_event = ParentPollerWindows.create_interrupt_event()
158 168 arguments += [ '--interrupt=%i'%interrupt_event ]
159 169
160 170 # If the kernel is running on pythonw and stdout/stderr are not been
161 171 # re-directed, it will crash when more than 4KB of data is written to
162 172 # stdout or stderr. This is a bug that has been with Python for a very
163 173 # long time; see http://bugs.python.org/issue706263.
164 174 # A cleaner solution to this problem would be to pass os.devnull to
165 175 # Popen directly. Unfortunately, that does not work.
166 176 if executable.endswith('pythonw.exe'):
167 177 if stdout is None:
168 178 arguments.append('--no-stdout')
169 179 if stderr is None:
170 180 arguments.append('--no-stderr')
171 181
172 182 # Launch the kernel process.
173 183 if independent:
174 184 proc = Popen(arguments,
175 185 creationflags=512, # CREATE_NEW_PROCESS_GROUP
176 186 stdin=_stdin, stdout=_stdout, stderr=_stderr)
177 187 else:
178 188 from _subprocess import DuplicateHandle, GetCurrentProcess, \
179 189 DUPLICATE_SAME_ACCESS
180 190 pid = GetCurrentProcess()
181 191 handle = DuplicateHandle(pid, pid, pid, 0,
182 192 True, # Inheritable by new processes.
183 193 DUPLICATE_SAME_ACCESS)
184 194 proc = Popen(arguments + ['--parent=%i'%int(handle)],
185 195 stdin=_stdin, stdout=_stdout, stderr=_stderr)
186 196
187 197 # Attach the interrupt event to the Popen objet so it can be used later.
188 198 proc.win32_interrupt_event = interrupt_event
189 199
190 200 else:
191 201 if independent:
192 202 proc = Popen(arguments, preexec_fn=lambda: os.setsid(),
193 203 stdin=_stdin, stdout=_stdout, stderr=_stderr)
194 204 else:
195 205 proc = Popen(arguments + ['--parent=1'],
196 206 stdin=_stdin, stdout=_stdout, stderr=_stderr)
197 207
198 208 # Clean up pipes created to work around Popen bug.
199 209 if redirect_in:
200 210 if stdin is None:
201 211 proc.stdin.close()
202 212 if redirect_out:
203 213 if stdout is None:
204 214 proc.stdout.close()
205 215 if stderr is None:
206 216 proc.stderr.close()
207 217
208 218 return proc
@@ -1,48 +1,56 b''
1 1 """The client and server for a basic ping-pong style heartbeat.
2 2 """
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2008-2011 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 import os
15 16 import socket
16 17 import sys
17 18 from threading import Thread
18 19
19 20 import zmq
20 21
21 22 from IPython.utils.localinterfaces import LOCALHOST
22 23
23 24 #-----------------------------------------------------------------------------
24 25 # Code
25 26 #-----------------------------------------------------------------------------
26 27
27 28
28 29 class Heartbeat(Thread):
29 30 "A simple ping-pong style heartbeat that runs in a thread."
30 31
31 def __init__(self, context, addr=(LOCALHOST, 0)):
32 def __init__(self, context, addr=('tcp', LOCALHOST, 0)):
32 33 Thread.__init__(self)
33 34 self.context = context
34 self.ip, self.port = addr
35 self.transport, self.ip, self.port = addr
35 36 if self.port == 0:
36 s = socket.socket()
37 # '*' means all interfaces to 0MQ, which is '' to socket.socket
38 s.bind(('' if self.ip == '*' else self.ip, 0))
39 self.port = s.getsockname()[1]
40 s.close()
37 if addr[0] == 'tcp':
38 s = socket.socket()
39 # '*' means all interfaces to 0MQ, which is '' to socket.socket
40 s.bind(('' if self.ip == '*' else self.ip, 0))
41 self.port = s.getsockname()[1]
42 s.close()
43 elif addr[0] == 'ipc':
44 while os.path.exists(self.ip + '-' + self.port):
45 self.port = self.port + 1
46 else:
47 raise ValueError("Unrecognized zmq transport: %s" % addr[0])
41 48 self.addr = (self.ip, self.port)
42 49 self.daemon = True
43 50
44 51 def run(self):
45 52 self.socket = self.context.socket(zmq.REP)
46 self.socket.bind('tcp://%s:%i' % self.addr)
53 c = ':' if self.transport == 'tcp' else '-'
54 self.socket.bind('%s://%s' % (self.transport, self.ip) + c + str(self.port))
47 55 zmq.device(zmq.FORWARDER, self.socket, self.socket)
48 56
@@ -1,333 +1,351 b''
1 1 """An Application for launching a kernel
2 2
3 3 Authors
4 4 -------
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING.txt, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports
19 19 import atexit
20 20 import json
21 21 import os
22 22 import sys
23 23 import signal
24 24
25 25 # System library imports
26 26 import zmq
27 27 from zmq.eventloop import ioloop
28 28
29 29 # IPython imports
30 30 from IPython.core.ultratb import FormattedTB
31 31 from IPython.core.application import (
32 32 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
33 33 )
34 34 from IPython.utils import io
35 35 from IPython.utils.localinterfaces import LOCALHOST
36 36 from IPython.utils.path import filefind
37 37 from IPython.utils.py3compat import str_to_bytes
38 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
39 DottedObjectName)
38 from IPython.utils.traitlets import (
39 Any, Instance, Dict, Unicode, Integer, Bool, CaselessStrEnum,
40 DottedObjectName,
41 )
40 42 from IPython.utils.importstring import import_item
41 43 # local imports
42 44 from IPython.zmq.entry_point import write_connection_file
43 45 from IPython.zmq.heartbeat import Heartbeat
44 46 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
45 47 from IPython.zmq.session import (
46 48 Session, session_flags, session_aliases, default_secure,
47 49 )
48 50
49 51
50 52 #-----------------------------------------------------------------------------
51 53 # Flags and Aliases
52 54 #-----------------------------------------------------------------------------
53 55
54 56 kernel_aliases = dict(base_aliases)
55 57 kernel_aliases.update({
56 58 'ip' : 'KernelApp.ip',
57 59 'hb' : 'KernelApp.hb_port',
58 60 'shell' : 'KernelApp.shell_port',
59 61 'iopub' : 'KernelApp.iopub_port',
60 62 'stdin' : 'KernelApp.stdin_port',
61 63 'f' : 'KernelApp.connection_file',
62 64 'parent': 'KernelApp.parent',
63 65 })
64 66 if sys.platform.startswith('win'):
65 67 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
66 68
67 69 kernel_flags = dict(base_flags)
68 70 kernel_flags.update({
69 71 'no-stdout' : (
70 72 {'KernelApp' : {'no_stdout' : True}},
71 73 "redirect stdout to the null device"),
72 74 'no-stderr' : (
73 75 {'KernelApp' : {'no_stderr' : True}},
74 76 "redirect stderr to the null device"),
75 77 })
76 78
77 79 # inherit flags&aliases for Sessions
78 80 kernel_aliases.update(session_aliases)
79 81 kernel_flags.update(session_flags)
80 82
81 83
82 84
83 85 #-----------------------------------------------------------------------------
84 86 # Application class for starting a Kernel
85 87 #-----------------------------------------------------------------------------
86 88
87 89 class KernelApp(BaseIPythonApplication):
88 90 name='ipkernel'
89 91 aliases = Dict(kernel_aliases)
90 92 flags = Dict(kernel_flags)
91 93 classes = [Session]
92 94 # the kernel class, as an importstring
93 95 kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel')
94 96 kernel = Any()
95 97 poller = Any() # don't restrict this even though current pollers are all Threads
96 98 heartbeat = Instance(Heartbeat)
97 99 session = Instance('IPython.zmq.session.Session')
98 100 ports = Dict()
99 101 _full_connection_file = Unicode()
100 102
101 103 # inherit config file name from parent:
102 104 parent_appname = Unicode(config=True)
103 105 def _parent_appname_changed(self, name, old, new):
104 106 if self.config_file_specified:
105 107 # it was manually specified, ignore
106 108 return
107 109 self.config_file_name = new.replace('-','_') + u'_config.py'
108 110 # don't let this count as specifying the config file
109 111 self.config_file_specified = False
110 112
111 113 # connection info:
114 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
112 115 ip = Unicode(LOCALHOST, config=True,
113 116 help="Set the IP or interface on which the kernel will listen.")
114 117 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
115 118 shell_port = Integer(0, config=True, help="set the shell (XREP) port [default: random]")
116 119 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
117 120 stdin_port = Integer(0, config=True, help="set the stdin (XREQ) port [default: random]")
118 121 connection_file = Unicode('', config=True,
119 122 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
120 123
121 124 This file will contain the IP, ports, and authentication key needed to connect
122 125 clients to this kernel. By default, this file will be created in the security-dir
123 126 of the current profile, but can be specified by absolute path.
124 127 """)
125 128
126 129 # streams, etc.
127 130 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
128 131 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
129 132 outstream_class = DottedObjectName('IPython.zmq.iostream.OutStream',
130 133 config=True, help="The importstring for the OutStream factory")
131 134 displayhook_class = DottedObjectName('IPython.zmq.displayhook.ZMQDisplayHook',
132 135 config=True, help="The importstring for the DisplayHook factory")
133 136
134 137 # polling
135 138 parent = Integer(0, config=True,
136 139 help="""kill this process if its parent dies. On Windows, the argument
137 140 specifies the HANDLE of the parent process, otherwise it is simply boolean.
138 141 """)
139 142 interrupt = Integer(0, config=True,
140 143 help="""ONLY USED ON WINDOWS
141 144 Interrupt this process when the parent is signalled.
142 145 """)
143 146
144 147 def init_crash_handler(self):
145 148 # Install minimal exception handling
146 149 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
147 150 ostream=sys.__stdout__)
148 151
149 152 def init_poller(self):
150 153 if sys.platform == 'win32':
151 154 if self.interrupt or self.parent:
152 155 self.poller = ParentPollerWindows(self.interrupt, self.parent)
153 156 elif self.parent:
154 157 self.poller = ParentPollerUnix()
155 158
156 159 def _bind_socket(self, s, port):
157 iface = 'tcp://%s' % self.ip
158 if port <= 0:
160 iface = '%s://%s' % (self.transport, self.ip)
161 if port <= 0 and self.transport == 'tcp':
159 162 port = s.bind_to_random_port(iface)
160 163 else:
161 s.bind(iface + ':%i'%port)
164 c = ':' if self.transport == 'tcp' else '-'
165 s.bind(iface + c + str(port))
162 166 return port
163 167
164 168 def load_connection_file(self):
165 169 """load ip/port/hmac config from JSON connection file"""
166 170 try:
167 171 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
168 172 except IOError:
169 173 self.log.debug("Connection file not found: %s", self.connection_file)
170 174 # This means I own it, so I will clean it up:
171 175 atexit.register(self.cleanup_connection_file)
172 176 return
173 177 self.log.debug(u"Loading connection file %s", fname)
174 178 with open(fname) as f:
175 179 s = f.read()
176 180 cfg = json.loads(s)
181 self.transport = cfg.get('transport', self.transport)
177 182 if self.ip == LOCALHOST and 'ip' in cfg:
178 183 # not overridden by config or cl_args
179 184 self.ip = cfg['ip']
180 185 for channel in ('hb', 'shell', 'iopub', 'stdin'):
181 186 name = channel + '_port'
182 187 if getattr(self, name) == 0 and name in cfg:
183 188 # not overridden by config or cl_args
184 189 setattr(self, name, cfg[name])
185 190 if 'key' in cfg:
186 191 self.config.Session.key = str_to_bytes(cfg['key'])
187 192
188 193 def write_connection_file(self):
189 194 """write connection info to JSON file"""
190 195 if os.path.basename(self.connection_file) == self.connection_file:
191 196 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
192 197 else:
193 198 cf = self.connection_file
194 write_connection_file(cf, ip=self.ip, key=self.session.key,
199 write_connection_file(cf, ip=self.ip, key=self.session.key, transport=self.transport,
195 200 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
196 201 iopub_port=self.iopub_port)
197 202
198 203 self._full_connection_file = cf
199 204
200 205 def cleanup_connection_file(self):
201 206 cf = self._full_connection_file
202 207 self.log.debug("cleaning up connection file: %r", cf)
203 208 try:
204 209 os.remove(cf)
205 210 except (IOError, OSError):
206 211 pass
212
213 self._cleanup_ipc_files()
214
215 def _cleanup_ipc_files(self):
216 """cleanup ipc files if we wrote them"""
217 if self.transport != 'ipc':
218 return
219 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
220 ipcfile = "%s-%i" % (self.ip, port)
221 try:
222 os.remove(ipcfile)
223 except (IOError, OSError):
224 pass
207 225
208 226 def init_connection_file(self):
209 227 if not self.connection_file:
210 228 self.connection_file = "kernel-%s.json"%os.getpid()
211 229 try:
212 230 self.load_connection_file()
213 231 except Exception:
214 232 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
215 233 self.exit(1)
216 234
217 235 def init_sockets(self):
218 236 # Create a context, a session, and the kernel sockets.
219 237 self.log.info("Starting the kernel at pid: %i", os.getpid())
220 238 context = zmq.Context.instance()
221 239 # Uncomment this to try closing the context.
222 240 # atexit.register(context.term)
223 241
224 242 self.shell_socket = context.socket(zmq.ROUTER)
225 243 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
226 244 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
227 245
228 246 self.iopub_socket = context.socket(zmq.PUB)
229 247 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
230 248 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
231 249
232 250 self.stdin_socket = context.socket(zmq.ROUTER)
233 251 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
234 252 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
235 253
236 254 def init_heartbeat(self):
237 255 """start the heart beating"""
238 256 # heartbeat doesn't share context, because it mustn't be blocked
239 257 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
240 258 hb_ctx = zmq.Context()
241 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
259 self.heartbeat = Heartbeat(hb_ctx, (self.transport, self.ip, self.hb_port))
242 260 self.hb_port = self.heartbeat.port
243 261 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
244 262 self.heartbeat.start()
245 263
246 264 # Helper to make it easier to connect to an existing kernel.
247 265 # set log-level to critical, to make sure it is output
248 266 self.log.critical("To connect another client to this kernel, use:")
249 267
250 268 def log_connection_info(self):
251 269 """display connection info, and store ports"""
252 270 basename = os.path.basename(self.connection_file)
253 271 if basename == self.connection_file or \
254 272 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
255 273 # use shortname
256 274 tail = basename
257 275 if self.profile != 'default':
258 276 tail += " --profile %s" % self.profile
259 277 else:
260 278 tail = self.connection_file
261 279 self.log.critical("--existing %s", tail)
262 280
263 281
264 282 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
265 283 stdin=self.stdin_port, hb=self.hb_port)
266 284
267 285 def init_session(self):
268 286 """create our session object"""
269 287 default_secure(self.config)
270 288 self.session = Session(config=self.config, username=u'kernel')
271 289
272 290 def init_blackhole(self):
273 291 """redirects stdout/stderr to devnull if necessary"""
274 292 if self.no_stdout or self.no_stderr:
275 293 blackhole = open(os.devnull, 'w')
276 294 if self.no_stdout:
277 295 sys.stdout = sys.__stdout__ = blackhole
278 296 if self.no_stderr:
279 297 sys.stderr = sys.__stderr__ = blackhole
280 298
281 299 def init_io(self):
282 300 """Redirect input streams and set a display hook."""
283 301 if self.outstream_class:
284 302 outstream_factory = import_item(str(self.outstream_class))
285 303 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
286 304 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
287 305 if self.displayhook_class:
288 306 displayhook_factory = import_item(str(self.displayhook_class))
289 307 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
290 308
291 309 def init_signal(self):
292 310 signal.signal(signal.SIGINT, signal.SIG_IGN)
293 311
294 312 def init_kernel(self):
295 313 """Create the Kernel object itself"""
296 314 kernel_factory = import_item(str(self.kernel_class))
297 315 self.kernel = kernel_factory(config=self.config, session=self.session,
298 316 shell_socket=self.shell_socket,
299 317 iopub_socket=self.iopub_socket,
300 318 stdin_socket=self.stdin_socket,
301 319 log=self.log
302 320 )
303 321 self.kernel.record_ports(self.ports)
304 322
305 323 @catch_config_error
306 324 def initialize(self, argv=None):
307 325 super(KernelApp, self).initialize(argv)
308 326 self.init_blackhole()
309 327 self.init_connection_file()
310 328 self.init_session()
311 329 self.init_poller()
312 330 self.init_sockets()
313 331 self.init_heartbeat()
314 332 # writing/displaying connection info must be *after* init_sockets/heartbeat
315 333 self.log_connection_info()
316 334 self.write_connection_file()
317 335 self.init_io()
318 336 self.init_signal()
319 337 self.init_kernel()
320 338 # flush stdout/stderr, so that anything written to these streams during
321 339 # initialization do not get associated with the first execution request
322 340 sys.stdout.flush()
323 341 sys.stderr.flush()
324 342
325 343 def start(self):
326 344 if self.poller is not None:
327 345 self.poller.start()
328 346 self.kernel.start()
329 347 try:
330 348 ioloop.IOLoop.instance().start()
331 349 except KeyboardInterrupt:
332 350 pass
333 351
@@ -1,994 +1,1024 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.loader import Config
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Constants and exceptions
48 48 #-----------------------------------------------------------------------------
49 49
50 50 class InvalidPortNumber(Exception):
51 51 pass
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Utility functions
55 55 #-----------------------------------------------------------------------------
56 56
57 57 # some utilities to validate message structure, these might get moved elsewhere
58 58 # if they prove to have more generic utility
59 59
60 60 def validate_string_list(lst):
61 61 """Validate that the input is a list of strings.
62 62
63 63 Raises ValueError if not."""
64 64 if not isinstance(lst, list):
65 65 raise ValueError('input %r must be a list' % lst)
66 66 for x in lst:
67 67 if not isinstance(x, basestring):
68 68 raise ValueError('element %r in list must be a string' % x)
69 69
70 70
71 71 def validate_string_dict(dct):
72 72 """Validate that the input is a dict with string keys and values.
73 73
74 74 Raises ValueError if not."""
75 75 for k,v in dct.iteritems():
76 76 if not isinstance(k, basestring):
77 77 raise ValueError('key %r in dict must be a string' % k)
78 78 if not isinstance(v, basestring):
79 79 raise ValueError('value %r in dict must be a string' % v)
80 80
81 81
82 82 #-----------------------------------------------------------------------------
83 83 # ZMQ Socket Channel classes
84 84 #-----------------------------------------------------------------------------
85 85
86 86 class ZMQSocketChannel(Thread):
87 87 """The base class for the channels that use ZMQ sockets.
88 88 """
89 89 context = None
90 90 session = None
91 91 socket = None
92 92 ioloop = None
93 93 stream = None
94 94 _address = None
95 95 _exiting = False
96 96
97 97 def __init__(self, context, session, address):
98 98 """Create a channel
99 99
100 100 Parameters
101 101 ----------
102 102 context : :class:`zmq.Context`
103 103 The ZMQ context to use.
104 104 session : :class:`session.Session`
105 105 The session to use.
106 address : tuple
106 address : zmq url
107 107 Standard (ip, port) tuple that the kernel is listening on.
108 108 """
109 109 super(ZMQSocketChannel, self).__init__()
110 110 self.daemon = True
111 111
112 112 self.context = context
113 113 self.session = session
114 if address[1] == 0:
115 message = 'The port number for a channel cannot be 0.'
116 raise InvalidPortNumber(message)
114 if isinstance(address, tuple):
115 if address[1] == 0:
116 message = 'The port number for a channel cannot be 0.'
117 raise InvalidPortNumber(message)
118 address = "tcp://%s:%i" % address
117 119 self._address = address
118 120 atexit.register(self._notice_exit)
119 121
120 122 def _notice_exit(self):
121 123 self._exiting = True
122 124
123 125 def _run_loop(self):
124 126 """Run my loop, ignoring EINTR events in the poller"""
125 127 while True:
126 128 try:
127 129 self.ioloop.start()
128 130 except ZMQError as e:
129 131 if e.errno == errno.EINTR:
130 132 continue
131 133 else:
132 134 raise
133 135 except Exception:
134 136 if self._exiting:
135 137 break
136 138 else:
137 139 raise
138 140 else:
139 141 break
140 142
141 143 def stop(self):
142 144 """Stop the channel's activity.
143 145
144 146 This calls :method:`Thread.join` and returns when the thread
145 147 terminates. :class:`RuntimeError` will be raised if
146 148 :method:`self.start` is called again.
147 149 """
148 150 self.join()
149 151
150 152 @property
151 153 def address(self):
152 """Get the channel's address as an (ip, port) tuple.
153
154 By the default, the address is (localhost, 0), where 0 means a random
155 port.
154 """Get the channel's address as a zmq url string ('tcp://127.0.0.1:5555').
156 155 """
157 156 return self._address
158 157
159 158 def _queue_send(self, msg):
160 159 """Queue a message to be sent from the IOLoop's thread.
161 160
162 161 Parameters
163 162 ----------
164 163 msg : message to send
165 164
166 165 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 166 thread control of the action.
168 167 """
169 168 def thread_send():
170 169 self.session.send(self.stream, msg)
171 170 self.ioloop.add_callback(thread_send)
172 171
173 172 def _handle_recv(self, msg):
174 173 """callback for stream.on_recv
175 174
176 175 unpacks message, and calls handlers with it.
177 176 """
178 177 ident,smsg = self.session.feed_identities(msg)
179 178 self.call_handlers(self.session.unserialize(smsg))
180 179
181 180
182 181
183 182 class ShellSocketChannel(ZMQSocketChannel):
184 183 """The XREQ channel for issues request/replies to the kernel.
185 184 """
186 185
187 186 command_queue = None
188 187 # flag for whether execute requests should be allowed to call raw_input:
189 188 allow_stdin = True
190 189
191 190 def __init__(self, context, session, address):
192 191 super(ShellSocketChannel, self).__init__(context, session, address)
193 192 self.ioloop = ioloop.IOLoop()
194 193
195 194 def run(self):
196 195 """The thread's main activity. Call start() instead."""
197 196 self.socket = self.context.socket(zmq.DEALER)
198 197 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 self.socket.connect('tcp://%s:%i' % self.address)
198 self.socket.connect(self.address)
200 199 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 200 self.stream.on_recv(self._handle_recv)
202 201 self._run_loop()
203 202 try:
204 203 self.socket.close()
205 204 except:
206 205 pass
207 206
208 207 def stop(self):
209 208 self.ioloop.stop()
210 209 super(ShellSocketChannel, self).stop()
211 210
212 211 def call_handlers(self, msg):
213 212 """This method is called in the ioloop thread when a message arrives.
214 213
215 214 Subclasses should override this method to handle incoming messages.
216 215 It is important to remember that this method is called in the thread
217 216 so that some logic must be done to ensure that the application leve
218 217 handlers are called in the application thread.
219 218 """
220 219 raise NotImplementedError('call_handlers must be defined in a subclass.')
221 220
222 221 def execute(self, code, silent=False,
223 222 user_variables=None, user_expressions=None, allow_stdin=None):
224 223 """Execute code in the kernel.
225 224
226 225 Parameters
227 226 ----------
228 227 code : str
229 228 A string of Python code.
230 229
231 230 silent : bool, optional (default False)
232 231 If set, the kernel will execute the code as quietly possible.
233 232
234 233 user_variables : list, optional
235 234 A list of variable names to pull from the user's namespace. They
236 235 will come back as a dict with these names as keys and their
237 236 :func:`repr` as values.
238 237
239 238 user_expressions : dict, optional
240 239 A dict with string keys and to pull from the user's
241 240 namespace. They will come back as a dict with these names as keys
242 241 and their :func:`repr` as values.
243 242
244 243 allow_stdin : bool, optional
245 244 Flag for
246 245 A dict with string keys and to pull from the user's
247 246 namespace. They will come back as a dict with these names as keys
248 247 and their :func:`repr` as values.
249 248
250 249 Returns
251 250 -------
252 251 The msg_id of the message sent.
253 252 """
254 253 if user_variables is None:
255 254 user_variables = []
256 255 if user_expressions is None:
257 256 user_expressions = {}
258 257 if allow_stdin is None:
259 258 allow_stdin = self.allow_stdin
260 259
261 260
262 261 # Don't waste network traffic if inputs are invalid
263 262 if not isinstance(code, basestring):
264 263 raise ValueError('code %r must be a string' % code)
265 264 validate_string_list(user_variables)
266 265 validate_string_dict(user_expressions)
267 266
268 267 # Create class for content/msg creation. Related to, but possibly
269 268 # not in Session.
270 269 content = dict(code=code, silent=silent,
271 270 user_variables=user_variables,
272 271 user_expressions=user_expressions,
273 272 allow_stdin=allow_stdin,
274 273 )
275 274 msg = self.session.msg('execute_request', content)
276 275 self._queue_send(msg)
277 276 return msg['header']['msg_id']
278 277
279 278 def complete(self, text, line, cursor_pos, block=None):
280 279 """Tab complete text in the kernel's namespace.
281 280
282 281 Parameters
283 282 ----------
284 283 text : str
285 284 The text to complete.
286 285 line : str
287 286 The full line of text that is the surrounding context for the
288 287 text to complete.
289 288 cursor_pos : int
290 289 The position of the cursor in the line where the completion was
291 290 requested.
292 291 block : str, optional
293 292 The full block of code in which the completion is being requested.
294 293
295 294 Returns
296 295 -------
297 296 The msg_id of the message sent.
298 297 """
299 298 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
300 299 msg = self.session.msg('complete_request', content)
301 300 self._queue_send(msg)
302 301 return msg['header']['msg_id']
303 302
304 303 def object_info(self, oname, detail_level=0):
305 304 """Get metadata information about an object.
306 305
307 306 Parameters
308 307 ----------
309 308 oname : str
310 309 A string specifying the object name.
311 310 detail_level : int, optional
312 311 The level of detail for the introspection (0-2)
313 312
314 313 Returns
315 314 -------
316 315 The msg_id of the message sent.
317 316 """
318 317 content = dict(oname=oname, detail_level=detail_level)
319 318 msg = self.session.msg('object_info_request', content)
320 319 self._queue_send(msg)
321 320 return msg['header']['msg_id']
322 321
323 322 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
324 323 """Get entries from the history list.
325 324
326 325 Parameters
327 326 ----------
328 327 raw : bool
329 328 If True, return the raw input.
330 329 output : bool
331 330 If True, then return the output as well.
332 331 hist_access_type : str
333 332 'range' (fill in session, start and stop params), 'tail' (fill in n)
334 333 or 'search' (fill in pattern param).
335 334
336 335 session : int
337 336 For a range request, the session from which to get lines. Session
338 337 numbers are positive integers; negative ones count back from the
339 338 current session.
340 339 start : int
341 340 The first line number of a history range.
342 341 stop : int
343 342 The final (excluded) line number of a history range.
344 343
345 344 n : int
346 345 The number of lines of history to get for a tail request.
347 346
348 347 pattern : str
349 348 The glob-syntax pattern for a search request.
350 349
351 350 Returns
352 351 -------
353 352 The msg_id of the message sent.
354 353 """
355 354 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
356 355 **kwargs)
357 356 msg = self.session.msg('history_request', content)
358 357 self._queue_send(msg)
359 358 return msg['header']['msg_id']
360 359
361 360 def shutdown(self, restart=False):
362 361 """Request an immediate kernel shutdown.
363 362
364 363 Upon receipt of the (empty) reply, client code can safely assume that
365 364 the kernel has shut down and it's safe to forcefully terminate it if
366 365 it's still alive.
367 366
368 367 The kernel will send the reply via a function registered with Python's
369 368 atexit module, ensuring it's truly done as the kernel is done with all
370 369 normal operation.
371 370 """
372 371 # Send quit message to kernel. Once we implement kernel-side setattr,
373 372 # this should probably be done that way, but for now this will do.
374 373 msg = self.session.msg('shutdown_request', {'restart':restart})
375 374 self._queue_send(msg)
376 375 return msg['header']['msg_id']
377 376
378 377
379 378
380 379 class SubSocketChannel(ZMQSocketChannel):
381 380 """The SUB channel which listens for messages that the kernel publishes.
382 381 """
383 382
384 383 def __init__(self, context, session, address):
385 384 super(SubSocketChannel, self).__init__(context, session, address)
386 385 self.ioloop = ioloop.IOLoop()
387 386
388 387 def run(self):
389 388 """The thread's main activity. Call start() instead."""
390 389 self.socket = self.context.socket(zmq.SUB)
391 390 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
392 391 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
393 self.socket.connect('tcp://%s:%i' % self.address)
392 self.socket.connect(self.address)
394 393 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
395 394 self.stream.on_recv(self._handle_recv)
396 395 self._run_loop()
397 396 try:
398 397 self.socket.close()
399 398 except:
400 399 pass
401 400
402 401 def stop(self):
403 402 self.ioloop.stop()
404 403 super(SubSocketChannel, self).stop()
405 404
406 405 def call_handlers(self, msg):
407 406 """This method is called in the ioloop thread when a message arrives.
408 407
409 408 Subclasses should override this method to handle incoming messages.
410 409 It is important to remember that this method is called in the thread
411 410 so that some logic must be done to ensure that the application leve
412 411 handlers are called in the application thread.
413 412 """
414 413 raise NotImplementedError('call_handlers must be defined in a subclass.')
415 414
416 415 def flush(self, timeout=1.0):
417 416 """Immediately processes all pending messages on the SUB channel.
418 417
419 418 Callers should use this method to ensure that :method:`call_handlers`
420 419 has been called for all messages that have been received on the
421 420 0MQ SUB socket of this channel.
422 421
423 422 This method is thread safe.
424 423
425 424 Parameters
426 425 ----------
427 426 timeout : float, optional
428 427 The maximum amount of time to spend flushing, in seconds. The
429 428 default is one second.
430 429 """
431 430 # We do the IOLoop callback process twice to ensure that the IOLoop
432 431 # gets to perform at least one full poll.
433 432 stop_time = time.time() + timeout
434 433 for i in xrange(2):
435 434 self._flushed = False
436 435 self.ioloop.add_callback(self._flush)
437 436 while not self._flushed and time.time() < stop_time:
438 437 time.sleep(0.01)
439 438
440 439 def _flush(self):
441 440 """Callback for :method:`self.flush`."""
442 441 self.stream.flush()
443 442 self._flushed = True
444 443
445 444
446 445 class StdInSocketChannel(ZMQSocketChannel):
447 446 """A reply channel to handle raw_input requests that the kernel makes."""
448 447
449 448 msg_queue = None
450 449
451 450 def __init__(self, context, session, address):
452 451 super(StdInSocketChannel, self).__init__(context, session, address)
453 452 self.ioloop = ioloop.IOLoop()
454 453
455 454 def run(self):
456 455 """The thread's main activity. Call start() instead."""
457 456 self.socket = self.context.socket(zmq.DEALER)
458 457 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
459 self.socket.connect('tcp://%s:%i' % self.address)
458 self.socket.connect(self.address)
460 459 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
461 460 self.stream.on_recv(self._handle_recv)
462 461 self._run_loop()
463 462 try:
464 463 self.socket.close()
465 464 except:
466 465 pass
467 466
468 467
469 468 def stop(self):
470 469 self.ioloop.stop()
471 470 super(StdInSocketChannel, self).stop()
472 471
473 472 def call_handlers(self, msg):
474 473 """This method is called in the ioloop thread when a message arrives.
475 474
476 475 Subclasses should override this method to handle incoming messages.
477 476 It is important to remember that this method is called in the thread
478 477 so that some logic must be done to ensure that the application leve
479 478 handlers are called in the application thread.
480 479 """
481 480 raise NotImplementedError('call_handlers must be defined in a subclass.')
482 481
483 482 def input(self, string):
484 483 """Send a string of raw input to the kernel."""
485 484 content = dict(value=string)
486 485 msg = self.session.msg('input_reply', content)
487 486 self._queue_send(msg)
488 487
489 488
490 489 class HBSocketChannel(ZMQSocketChannel):
491 490 """The heartbeat channel which monitors the kernel heartbeat.
492 491
493 492 Note that the heartbeat channel is paused by default. As long as you start
494 493 this channel, the kernel manager will ensure that it is paused and un-paused
495 494 as appropriate.
496 495 """
497 496
498 497 time_to_dead = 3.0
499 498 socket = None
500 499 poller = None
501 500 _running = None
502 501 _pause = None
503 502 _beating = None
504 503
505 504 def __init__(self, context, session, address):
506 505 super(HBSocketChannel, self).__init__(context, session, address)
507 506 self._running = False
508 507 self._pause =True
509 508 self.poller = zmq.Poller()
510 509
511 510 def _create_socket(self):
512 511 if self.socket is not None:
513 512 # close previous socket, before opening a new one
514 513 self.poller.unregister(self.socket)
515 514 self.socket.close()
516 515 self.socket = self.context.socket(zmq.REQ)
517 516 self.socket.setsockopt(zmq.LINGER, 0)
518 self.socket.connect('tcp://%s:%i' % self.address)
517 self.socket.connect(self.address)
519 518
520 519 self.poller.register(self.socket, zmq.POLLIN)
521 520
522 521 def _poll(self, start_time):
523 522 """poll for heartbeat replies until we reach self.time_to_dead
524 523
525 524 Ignores interrupts, and returns the result of poll(), which
526 525 will be an empty list if no messages arrived before the timeout,
527 526 or the event tuple if there is a message to receive.
528 527 """
529 528
530 529 until_dead = self.time_to_dead - (time.time() - start_time)
531 530 # ensure poll at least once
532 531 until_dead = max(until_dead, 1e-3)
533 532 events = []
534 533 while True:
535 534 try:
536 535 events = self.poller.poll(1000 * until_dead)
537 536 except ZMQError as e:
538 537 if e.errno == errno.EINTR:
539 538 # ignore interrupts during heartbeat
540 539 # this may never actually happen
541 540 until_dead = self.time_to_dead - (time.time() - start_time)
542 541 until_dead = max(until_dead, 1e-3)
543 542 pass
544 543 else:
545 544 raise
546 545 except Exception:
547 546 if self._exiting:
548 547 break
549 548 else:
550 549 raise
551 550 else:
552 551 break
553 552 return events
554 553
555 554 def run(self):
556 555 """The thread's main activity. Call start() instead."""
557 556 self._create_socket()
558 557 self._running = True
559 558 self._beating = True
560 559
561 560 while self._running:
562 561 if self._pause:
563 562 # just sleep, and skip the rest of the loop
564 563 time.sleep(self.time_to_dead)
565 564 continue
566 565
567 566 since_last_heartbeat = 0.0
568 567 # io.rprint('Ping from HB channel') # dbg
569 568 # no need to catch EFSM here, because the previous event was
570 569 # either a recv or connect, which cannot be followed by EFSM
571 570 self.socket.send(b'ping')
572 571 request_time = time.time()
573 572 ready = self._poll(request_time)
574 573 if ready:
575 574 self._beating = True
576 575 # the poll above guarantees we have something to recv
577 576 self.socket.recv()
578 577 # sleep the remainder of the cycle
579 578 remainder = self.time_to_dead - (time.time() - request_time)
580 579 if remainder > 0:
581 580 time.sleep(remainder)
582 581 continue
583 582 else:
584 583 # nothing was received within the time limit, signal heart failure
585 584 self._beating = False
586 585 since_last_heartbeat = time.time() - request_time
587 586 self.call_handlers(since_last_heartbeat)
588 587 # and close/reopen the socket, because the REQ/REP cycle has been broken
589 588 self._create_socket()
590 589 continue
591 590 try:
592 591 self.socket.close()
593 592 except:
594 593 pass
595 594
596 595 def pause(self):
597 596 """Pause the heartbeat."""
598 597 self._pause = True
599 598
600 599 def unpause(self):
601 600 """Unpause the heartbeat."""
602 601 self._pause = False
603 602
604 603 def is_beating(self):
605 604 """Is the heartbeat running and responsive (and not paused)."""
606 605 if self.is_alive() and not self._pause and self._beating:
607 606 return True
608 607 else:
609 608 return False
610 609
611 610 def stop(self):
612 611 self._running = False
613 612 super(HBSocketChannel, self).stop()
614 613
615 614 def call_handlers(self, since_last_heartbeat):
616 615 """This method is called in the ioloop thread when a message arrives.
617 616
618 617 Subclasses should override this method to handle incoming messages.
619 618 It is important to remember that this method is called in the thread
620 619 so that some logic must be done to ensure that the application level
621 620 handlers are called in the application thread.
622 621 """
623 622 raise NotImplementedError('call_handlers must be defined in a subclass.')
624 623
625 624
626 625 #-----------------------------------------------------------------------------
627 626 # Main kernel manager class
628 627 #-----------------------------------------------------------------------------
629 628
630 629 class KernelManager(HasTraits):
631 630 """ Manages a kernel for a frontend.
632 631
633 632 The SUB channel is for the frontend to receive messages published by the
634 633 kernel.
635 634
636 635 The REQ channel is for the frontend to make requests of the kernel.
637 636
638 637 The REP channel is for the kernel to request stdin (raw_input) from the
639 638 frontend.
640 639 """
641 640 # config object for passing to child configurables
642 641 config = Instance(Config)
643 642
644 643 # The PyZMQ Context to use for communication with the kernel.
645 644 context = Instance(zmq.Context)
646 645 def _context_default(self):
647 646 return zmq.Context.instance()
648 647
649 648 # The Session to use for communication with the kernel.
650 649 session = Instance(Session)
651 650
652 651 # The kernel process with which the KernelManager is communicating.
653 652 kernel = Instance(Popen)
654 653
655 654 # The addresses for the communication channels.
656 655 connection_file = Unicode('')
656
657 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp')
658
659
657 660 ip = Unicode(LOCALHOST)
658 661 def _ip_changed(self, name, old, new):
659 662 if new == '*':
660 663 self.ip = '0.0.0.0'
661 664 shell_port = Integer(0)
662 665 iopub_port = Integer(0)
663 666 stdin_port = Integer(0)
664 667 hb_port = Integer(0)
665 668
666 669 # The classes to use for the various channels.
667 670 shell_channel_class = Type(ShellSocketChannel)
668 671 sub_channel_class = Type(SubSocketChannel)
669 672 stdin_channel_class = Type(StdInSocketChannel)
670 673 hb_channel_class = Type(HBSocketChannel)
671 674
672 675 # Protected traits.
673 676 _launch_args = Any
674 677 _shell_channel = Any
675 678 _sub_channel = Any
676 679 _stdin_channel = Any
677 680 _hb_channel = Any
678 681 _connection_file_written=Bool(False)
679 682
680 683 def __init__(self, **kwargs):
681 684 super(KernelManager, self).__init__(**kwargs)
682 685 if self.session is None:
683 686 self.session = Session(config=self.config)
684 687
685 688 def __del__(self):
686 689 self.cleanup_connection_file()
687 690
688 691
689 692 #--------------------------------------------------------------------------
690 693 # Channel management methods:
691 694 #--------------------------------------------------------------------------
692 695
693 696 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
694 697 """Starts the channels for this kernel.
695 698
696 699 This will create the channels if they do not exist and then start
697 700 them. If port numbers of 0 are being used (random ports) then you
698 701 must first call :method:`start_kernel`. If the channels have been
699 702 stopped and you call this, :class:`RuntimeError` will be raised.
700 703 """
701 704 if shell:
702 705 self.shell_channel.start()
703 706 if sub:
704 707 self.sub_channel.start()
705 708 if stdin:
706 709 self.stdin_channel.start()
707 710 self.shell_channel.allow_stdin = True
708 711 else:
709 712 self.shell_channel.allow_stdin = False
710 713 if hb:
711 714 self.hb_channel.start()
712 715
713 716 def stop_channels(self):
714 717 """Stops all the running channels for this kernel.
715 718 """
716 719 if self.shell_channel.is_alive():
717 720 self.shell_channel.stop()
718 721 if self.sub_channel.is_alive():
719 722 self.sub_channel.stop()
720 723 if self.stdin_channel.is_alive():
721 724 self.stdin_channel.stop()
722 725 if self.hb_channel.is_alive():
723 726 self.hb_channel.stop()
724 727
725 728 @property
726 729 def channels_running(self):
727 730 """Are any of the channels created and running?"""
728 731 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
729 732 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
730 733
731 734 #--------------------------------------------------------------------------
732 735 # Kernel process management methods:
733 736 #--------------------------------------------------------------------------
734 737
735 738 def cleanup_connection_file(self):
736 739 """cleanup connection file *if we wrote it*
737 740
738 741 Will not raise if the connection file was already removed somehow.
739 742 """
740 743 if self._connection_file_written:
741 744 # cleanup connection files on full shutdown of kernel we started
742 745 self._connection_file_written = False
743 746 try:
744 747 os.remove(self.connection_file)
745 except OSError:
748 except (IOError, OSError):
749 pass
750
751 self._cleanup_ipc_files()
752
753 def _cleanup_ipc_files(self):
754 """cleanup ipc files if we wrote them"""
755 if self.transport != 'ipc':
756 return
757 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
758 ipcfile = "%s-%i" % (self.ip, port)
759 try:
760 os.remove(ipcfile)
761 except (IOError, OSError):
746 762 pass
747 763
748 764 def load_connection_file(self):
749 765 """load connection info from JSON dict in self.connection_file"""
750 766 with open(self.connection_file) as f:
751 767 cfg = json.loads(f.read())
752 768
769 from pprint import pprint
770 pprint(cfg)
771 self.transport = cfg.get('transport', 'tcp')
753 772 self.ip = cfg['ip']
754 773 self.shell_port = cfg['shell_port']
755 774 self.stdin_port = cfg['stdin_port']
756 775 self.iopub_port = cfg['iopub_port']
757 776 self.hb_port = cfg['hb_port']
758 777 self.session.key = str_to_bytes(cfg['key'])
759 778
760 779 def write_connection_file(self):
761 780 """write connection info to JSON dict in self.connection_file"""
762 781 if self._connection_file_written:
763 782 return
764 783 self.connection_file,cfg = write_connection_file(self.connection_file,
765 ip=self.ip, key=self.session.key,
784 transport=self.transport, ip=self.ip, key=self.session.key,
766 785 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
767 786 shell_port=self.shell_port, hb_port=self.hb_port)
768 787 # write_connection_file also sets default ports:
769 788 self.shell_port = cfg['shell_port']
770 789 self.stdin_port = cfg['stdin_port']
771 790 self.iopub_port = cfg['iopub_port']
772 791 self.hb_port = cfg['hb_port']
773 792
774 793 self._connection_file_written = True
775 794
776 795 def start_kernel(self, **kw):
777 796 """Starts a kernel process and configures the manager to use it.
778 797
779 798 If random ports (port=0) are being used, this method must be called
780 799 before the channels are created.
781 800
782 801 Parameters:
783 802 -----------
784 803 launcher : callable, optional (default None)
785 804 A custom function for launching the kernel process (generally a
786 805 wrapper around ``entry_point.base_launch_kernel``). In most cases,
787 806 it should not be necessary to use this parameter.
788 807
789 808 **kw : optional
790 809 See respective options for IPython and Python kernels.
791 810 """
792 if self.ip not in LOCAL_IPS:
811 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
793 812 raise RuntimeError("Can only launch a kernel on a local interface. "
794 813 "Make sure that the '*_address' attributes are "
795 814 "configured properly. "
796 815 "Currently valid addresses are: %s"%LOCAL_IPS
797 816 )
798 817
799 818 # write connection file / get default ports
800 819 self.write_connection_file()
801 820
802 821 self._launch_args = kw.copy()
803 822 launch_kernel = kw.pop('launcher', None)
804 823 if launch_kernel is None:
805 824 from ipkernel import launch_kernel
806 825 self.kernel = launch_kernel(fname=self.connection_file, **kw)
807 826
808 827 def shutdown_kernel(self, restart=False):
809 828 """ Attempts to the stop the kernel process cleanly. If the kernel
810 829 cannot be stopped, it is killed, if possible.
811 830 """
812 831 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
813 832 if sys.platform == 'win32':
814 833 self.kill_kernel()
815 834 return
816 835
817 836 # Pause the heart beat channel if it exists.
818 837 if self._hb_channel is not None:
819 838 self._hb_channel.pause()
820 839
821 840 # Don't send any additional kernel kill messages immediately, to give
822 841 # the kernel a chance to properly execute shutdown actions. Wait for at
823 842 # most 1s, checking every 0.1s.
824 843 self.shell_channel.shutdown(restart=restart)
825 844 for i in range(10):
826 845 if self.is_alive:
827 846 time.sleep(0.1)
828 847 else:
829 848 break
830 849 else:
831 850 # OK, we've waited long enough.
832 851 if self.has_kernel:
833 852 self.kill_kernel()
834 853
835 854 if not restart and self._connection_file_written:
836 855 # cleanup connection files on full shutdown of kernel we started
837 856 self._connection_file_written = False
838 857 try:
839 858 os.remove(self.connection_file)
840 859 except IOError:
841 860 pass
842 861
843 862 def restart_kernel(self, now=False, **kw):
844 863 """Restarts a kernel with the arguments that were used to launch it.
845 864
846 865 If the old kernel was launched with random ports, the same ports will be
847 866 used for the new kernel.
848 867
849 868 Parameters
850 869 ----------
851 870 now : bool, optional
852 871 If True, the kernel is forcefully restarted *immediately*, without
853 872 having a chance to do any cleanup action. Otherwise the kernel is
854 873 given 1s to clean up before a forceful restart is issued.
855 874
856 875 In all cases the kernel is restarted, the only difference is whether
857 876 it is given a chance to perform a clean shutdown or not.
858 877
859 878 **kw : optional
860 879 Any options specified here will replace those used to launch the
861 880 kernel.
862 881 """
863 882 if self._launch_args is None:
864 883 raise RuntimeError("Cannot restart the kernel. "
865 884 "No previous call to 'start_kernel'.")
866 885 else:
867 886 # Stop currently running kernel.
868 887 if self.has_kernel:
869 888 if now:
870 889 self.kill_kernel()
871 890 else:
872 891 self.shutdown_kernel(restart=True)
873 892
874 893 # Start new kernel.
875 894 self._launch_args.update(kw)
876 895 self.start_kernel(**self._launch_args)
877 896
878 897 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
879 898 # unless there is some delay here.
880 899 if sys.platform == 'win32':
881 900 time.sleep(0.2)
882 901
883 902 @property
884 903 def has_kernel(self):
885 904 """Returns whether a kernel process has been specified for the kernel
886 905 manager.
887 906 """
888 907 return self.kernel is not None
889 908
890 909 def kill_kernel(self):
891 910 """ Kill the running kernel. """
892 911 if self.has_kernel:
893 912 # Pause the heart beat channel if it exists.
894 913 if self._hb_channel is not None:
895 914 self._hb_channel.pause()
896 915
897 916 # Attempt to kill the kernel.
898 917 try:
899 918 self.kernel.kill()
900 919 except OSError, e:
901 920 # In Windows, we will get an Access Denied error if the process
902 921 # has already terminated. Ignore it.
903 922 if sys.platform == 'win32':
904 923 if e.winerror != 5:
905 924 raise
906 925 # On Unix, we may get an ESRCH error if the process has already
907 926 # terminated. Ignore it.
908 927 else:
909 928 from errno import ESRCH
910 929 if e.errno != ESRCH:
911 930 raise
912 931 self.kernel = None
913 932 else:
914 933 raise RuntimeError("Cannot kill kernel. No kernel is running!")
915 934
916 935 def interrupt_kernel(self):
917 936 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
918 937 well supported on all platforms.
919 938 """
920 939 if self.has_kernel:
921 940 if sys.platform == 'win32':
922 941 from parentpoller import ParentPollerWindows as Poller
923 942 Poller.send_interrupt(self.kernel.win32_interrupt_event)
924 943 else:
925 944 self.kernel.send_signal(signal.SIGINT)
926 945 else:
927 946 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
928 947
929 948 def signal_kernel(self, signum):
930 949 """ Sends a signal to the kernel. Note that since only SIGTERM is
931 950 supported on Windows, this function is only useful on Unix systems.
932 951 """
933 952 if self.has_kernel:
934 953 self.kernel.send_signal(signum)
935 954 else:
936 955 raise RuntimeError("Cannot signal kernel. No kernel is running!")
937 956
938 957 @property
939 958 def is_alive(self):
940 959 """Is the kernel process still running?"""
941 960 if self.has_kernel:
942 961 if self.kernel.poll() is None:
943 962 return True
944 963 else:
945 964 return False
946 965 elif self._hb_channel is not None:
947 966 # We didn't start the kernel with this KernelManager so we
948 967 # use the heartbeat.
949 968 return self._hb_channel.is_beating()
950 969 else:
951 970 # no heartbeat and not local, we can't tell if it's running,
952 971 # so naively return True
953 972 return True
954 973
955 974 #--------------------------------------------------------------------------
956 975 # Channels used for communication with the kernel:
957 976 #--------------------------------------------------------------------------
958 977
978 def _make_url(self, port):
979 """make a zmq url with a port"""
980 if self.transport == 'tcp':
981 return "tcp://%s:%i" % (self.ip, port)
982 else:
983 return "%s://%s-%s" % (self.transport, self.ip, port)
984
959 985 @property
960 986 def shell_channel(self):
961 987 """Get the REQ socket channel object to make requests of the kernel."""
962 988 if self._shell_channel is None:
963 989 self._shell_channel = self.shell_channel_class(self.context,
964 self.session,
965 (self.ip, self.shell_port))
990 self.session,
991 self._make_url(self.shell_port),
992 )
966 993 return self._shell_channel
967 994
968 995 @property
969 996 def sub_channel(self):
970 997 """Get the SUB socket channel object."""
971 998 if self._sub_channel is None:
972 999 self._sub_channel = self.sub_channel_class(self.context,
973 1000 self.session,
974 (self.ip, self.iopub_port))
1001 self._make_url(self.iopub_port),
1002 )
975 1003 return self._sub_channel
976 1004
977 1005 @property
978 1006 def stdin_channel(self):
979 1007 """Get the REP socket channel object to handle stdin (raw_input)."""
980 1008 if self._stdin_channel is None:
981 1009 self._stdin_channel = self.stdin_channel_class(self.context,
982 self.session,
983 (self.ip, self.stdin_port))
1010 self.session,
1011 self._make_url(self.stdin_port),
1012 )
984 1013 return self._stdin_channel
985 1014
986 1015 @property
987 1016 def hb_channel(self):
988 1017 """Get the heartbeat socket channel object to check that the
989 1018 kernel is alive."""
990 1019 if self._hb_channel is None:
991 1020 self._hb_channel = self.hb_channel_class(self.context,
992 self.session,
993 (self.ip, self.hb_port))
1021 self.session,
1022 self._make_url(self.hb_port),
1023 )
994 1024 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now