##// END OF EJS Templates
allow setting identities of Manager-created sockets...
MinRK -
Show More
@@ -1,390 +1,392 b''
1 1 """Base class to manage a running kernel
2 2 """
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2013 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15 from __future__ import absolute_import
16 16
17 17 # Standard library imports
18 18 import signal
19 19 import sys
20 20 import time
21 21
22 22 import zmq
23 23
24 24 # Local imports
25 25 from IPython.config.configurable import LoggingConfigurable
26 26 from IPython.utils.localinterfaces import LOCAL_IPS
27 27 from IPython.utils.traitlets import (
28 28 Any, Instance, Unicode, List, Bool,
29 29 )
30 30 from IPython.kernel import (
31 31 make_ipkernel_cmd,
32 32 launch_kernel,
33 33 )
34 34 from .connect import ConnectionFileMixin
35 35 from .zmq.session import Session
36 36 from .managerabc import (
37 37 KernelManagerABC
38 38 )
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # Main kernel manager class
42 42 #-----------------------------------------------------------------------------
43 43
44 44 _socket_types = {
45 45 'hb' : zmq.REQ,
46 46 'shell' : zmq.DEALER,
47 47 'iopub' : zmq.SUB,
48 48 'stdin' : zmq.DEALER,
49 49 'control': zmq.DEALER,
50 50 }
51 51
52 52 class KernelManager(LoggingConfigurable, ConnectionFileMixin):
53 53 """Manages a single kernel in a subprocess on this host.
54 54
55 55 This version starts kernels with Popen.
56 56 """
57 57
58 58 # The PyZMQ Context to use for communication with the kernel.
59 59 context = Instance(zmq.Context)
60 60 def _context_default(self):
61 61 return zmq.Context.instance()
62 62
63 63 # The Session to use for communication with the kernel.
64 64 session = Instance(Session)
65 65 def _session_default(self):
66 66 return Session(config=self.config)
67 67
68 68 # The kernel process with which the KernelManager is communicating.
69 69 # generally a Popen instance
70 70 kernel = Any()
71 71
72 72 kernel_cmd = List(Unicode, config=True,
73 73 help="""The Popen Command to launch the kernel.
74 74 Override this if you have a custom
75 75 """
76 76 )
77 77
78 78 def _kernel_cmd_changed(self, name, old, new):
79 79 self.ipython_kernel = False
80 80
81 81 ipython_kernel = Bool(True)
82 82
83 83 # Protected traits
84 84 _launch_args = Any()
85 85 _control_socket = Any()
86 86
87 87 autorestart = Bool(False, config=True,
88 88 help="""Should we autorestart the kernel if it dies."""
89 89 )
90 90
91 91 def __del__(self):
92 92 self._close_control_socket()
93 93 self.cleanup_connection_file()
94 94
95 95 #--------------------------------------------------------------------------
96 96 # Kernel restarter
97 97 #--------------------------------------------------------------------------
98 98
99 99 def start_restarter(self):
100 100 pass
101 101
102 102 def stop_restarter(self):
103 103 pass
104 104
105 105 #--------------------------------------------------------------------------
106 106 # Connection info
107 107 #--------------------------------------------------------------------------
108 108
109 109 def _make_url(self, channel):
110 110 """Make a ZeroMQ URL for a given channel."""
111 111 transport = self.transport
112 112 ip = self.ip
113 113 port = getattr(self, '%s_port' % channel)
114 114
115 115 if transport == 'tcp':
116 116 return "tcp://%s:%i" % (ip, port)
117 117 else:
118 118 return "%s://%s-%s" % (transport, ip, port)
119 119
120 def _create_connected_socket(self, channel):
120 def _create_connected_socket(self, channel, identity=None):
121 121 """Create a zmq Socket and connect it to the kernel."""
122 122 url = self._make_url(channel)
123 123 socket_type = _socket_types[channel]
124 sock = self.context.socket(socket_type)
125 124 self.log.info("Connecting to: %s" % url)
125 sock = self.context.socket(socket_type)
126 if identity:
127 sock.identity = identity
126 128 sock.connect(url)
127 129 return sock
128 130
129 def connect_iopub(self):
131 def connect_iopub(self, identity=None):
130 132 """return zmq Socket connected to the IOPub channel"""
131 sock = self._create_connected_socket('iopub')
133 sock = self._create_connected_socket('iopub', identity=identity)
132 134 sock.setsockopt(zmq.SUBSCRIBE, b'')
133 135 return sock
134 136
135 def connect_shell(self):
137 def connect_shell(self, identity=None):
136 138 """return zmq Socket connected to the Shell channel"""
137 return self._create_connected_socket('shell')
139 return self._create_connected_socket('shell', identity=identity)
138 140
139 def connect_stdin(self):
141 def connect_stdin(self, identity=None):
140 142 """return zmq Socket connected to the StdIn channel"""
141 return self._create_connected_socket('stdin')
143 return self._create_connected_socket('stdin', identity=identity)
142 144
143 def connect_hb(self):
145 def connect_hb(self, identity=None):
144 146 """return zmq Socket connected to the Heartbeat channel"""
145 return self._create_connected_socket('hb')
147 return self._create_connected_socket('hb', identity=identity)
146 148
147 def connect_control(self):
149 def connect_control(self, identity=None):
148 150 """return zmq Socket connected to the Heartbeat channel"""
149 return self._create_connected_socket('control')
151 return self._create_connected_socket('control', identity=identity)
150 152
151 153 #--------------------------------------------------------------------------
152 154 # Kernel management
153 155 #--------------------------------------------------------------------------
154 156
155 157 def format_kernel_cmd(self, **kw):
156 158 """format templated args (e.g. {connection_file})"""
157 159 if self.kernel_cmd:
158 160 cmd = self.kernel_cmd
159 161 else:
160 162 cmd = make_ipkernel_cmd(
161 163 'from IPython.kernel.zmq.kernelapp import main; main()',
162 164 **kw
163 165 )
164 166 ns = dict(connection_file=self.connection_file)
165 167 ns.update(self._launch_args)
166 168 return [ c.format(**ns) for c in cmd ]
167 169
168 170 def _launch_kernel(self, kernel_cmd, **kw):
169 171 """actually launch the kernel
170 172
171 173 override in a subclass to launch kernel subprocesses differently
172 174 """
173 175 return launch_kernel(kernel_cmd, **kw)
174 176
175 177 def _connect_control_socket(self):
176 178 if self._control_socket is None:
177 179 self._control_socket = self.connect_control()
178 180
179 181 def _close_control_socket(self):
180 182 if self._control_socket is None:
181 183 return
182 184 self._control_socket.linger = 100
183 185 self._control_socket.close()
184 186 self._control_socket = None
185 187
186 188
187 189 def start_kernel(self, **kw):
188 190 """Starts a kernel on this host in a separate process.
189 191
190 192 If random ports (port=0) are being used, this method must be called
191 193 before the channels are created.
192 194
193 195 Parameters:
194 196 -----------
195 197 **kw : optional
196 198 keyword arguments that are passed down to build the kernel_cmd
197 199 and launching the kernel (e.g. Popen kwargs).
198 200 """
199 201 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
200 202 raise RuntimeError("Can only launch a kernel on a local interface. "
201 203 "Make sure that the '*_address' attributes are "
202 204 "configured properly. "
203 205 "Currently valid addresses are: %s"%LOCAL_IPS
204 206 )
205 207
206 208 # write connection file / get default ports
207 209 self.write_connection_file()
208 210
209 211 # save kwargs for use in restart
210 212 self._launch_args = kw.copy()
211 213 # build the Popen cmd
212 214 kernel_cmd = self.format_kernel_cmd(**kw)
213 215 # launch the kernel subprocess
214 216 self.kernel = self._launch_kernel(kernel_cmd,
215 217 ipython_kernel=self.ipython_kernel,
216 218 **kw)
217 219 self.start_restarter()
218 220 self._connect_control_socket()
219 221
220 222 def _send_shutdown_request(self, restart=False):
221 223 """TODO: send a shutdown request via control channel"""
222 224 content = dict(restart=restart)
223 225 msg = self.session.msg("shutdown_request", content=content)
224 226 self.session.send(self._control_socket, msg)
225 227
226 228 def shutdown_kernel(self, now=False, restart=False):
227 229 """Attempts to the stop the kernel process cleanly.
228 230
229 231 This attempts to shutdown the kernels cleanly by:
230 232
231 233 1. Sending it a shutdown message over the shell channel.
232 234 2. If that fails, the kernel is shutdown forcibly by sending it
233 235 a signal.
234 236
235 237 Parameters:
236 238 -----------
237 239 now : bool
238 240 Should the kernel be forcible killed *now*. This skips the
239 241 first, nice shutdown attempt.
240 242 restart: bool
241 243 Will this kernel be restarted after it is shutdown. When this
242 244 is True, connection files will not be cleaned up.
243 245 """
244 246 # Stop monitoring for restarting while we shutdown.
245 247 self.stop_restarter()
246 248
247 249 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
248 250 if sys.platform == 'win32':
249 251 self._kill_kernel()
250 252 return
251 253
252 254 if now:
253 255 if self.has_kernel:
254 256 self._kill_kernel()
255 257 else:
256 258 # Don't send any additional kernel kill messages immediately, to give
257 259 # the kernel a chance to properly execute shutdown actions. Wait for at
258 260 # most 1s, checking every 0.1s.
259 261 self._send_shutdown_request(restart=restart)
260 262 for i in range(10):
261 263 if self.is_alive():
262 264 time.sleep(0.1)
263 265 else:
264 266 break
265 267 else:
266 268 # OK, we've waited long enough.
267 269 if self.has_kernel:
268 270 self._kill_kernel()
269 271
270 272 if not restart:
271 273 self.cleanup_connection_file()
272 274 self.cleanup_ipc_files()
273 275 else:
274 276 self.cleanup_ipc_files()
275 277
276 278 def restart_kernel(self, now=False, **kw):
277 279 """Restarts a kernel with the arguments that were used to launch it.
278 280
279 281 If the old kernel was launched with random ports, the same ports will be
280 282 used for the new kernel. The same connection file is used again.
281 283
282 284 Parameters
283 285 ----------
284 286 now : bool, optional
285 287 If True, the kernel is forcefully restarted *immediately*, without
286 288 having a chance to do any cleanup action. Otherwise the kernel is
287 289 given 1s to clean up before a forceful restart is issued.
288 290
289 291 In all cases the kernel is restarted, the only difference is whether
290 292 it is given a chance to perform a clean shutdown or not.
291 293
292 294 **kw : optional
293 295 Any options specified here will overwrite those used to launch the
294 296 kernel.
295 297 """
296 298 if self._launch_args is None:
297 299 raise RuntimeError("Cannot restart the kernel. "
298 300 "No previous call to 'start_kernel'.")
299 301 else:
300 302 # Stop currently running kernel.
301 303 self.shutdown_kernel(now=now, restart=True)
302 304
303 305 # Start new kernel.
304 306 self._launch_args.update(kw)
305 307 self.start_kernel(**self._launch_args)
306 308
307 309 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
308 310 # unless there is some delay here.
309 311 if sys.platform == 'win32':
310 312 time.sleep(0.2)
311 313
312 314 @property
313 315 def has_kernel(self):
314 316 """Has a kernel been started that we are managing."""
315 317 return self.kernel is not None
316 318
317 319 def _kill_kernel(self):
318 320 """Kill the running kernel.
319 321
320 322 This is a private method, callers should use shutdown_kernel(now=True).
321 323 """
322 324 if self.has_kernel:
323 325
324 326 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
325 327 # TerminateProcess() on Win32).
326 328 try:
327 329 self.kernel.kill()
328 330 except OSError as e:
329 331 # In Windows, we will get an Access Denied error if the process
330 332 # has already terminated. Ignore it.
331 333 if sys.platform == 'win32':
332 334 if e.winerror != 5:
333 335 raise
334 336 # On Unix, we may get an ESRCH error if the process has already
335 337 # terminated. Ignore it.
336 338 else:
337 339 from errno import ESRCH
338 340 if e.errno != ESRCH:
339 341 raise
340 342
341 343 # Block until the kernel terminates.
342 344 self.kernel.wait()
343 345 self.kernel = None
344 346 else:
345 347 raise RuntimeError("Cannot kill kernel. No kernel is running!")
346 348
347 349 def interrupt_kernel(self):
348 350 """Interrupts the kernel by sending it a signal.
349 351
350 352 Unlike ``signal_kernel``, this operation is well supported on all
351 353 platforms.
352 354 """
353 355 if self.has_kernel:
354 356 if sys.platform == 'win32':
355 357 from .zmq.parentpoller import ParentPollerWindows as Poller
356 358 Poller.send_interrupt(self.kernel.win32_interrupt_event)
357 359 else:
358 360 self.kernel.send_signal(signal.SIGINT)
359 361 else:
360 362 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
361 363
362 364 def signal_kernel(self, signum):
363 365 """Sends a signal to the kernel.
364 366
365 367 Note that since only SIGTERM is supported on Windows, this function is
366 368 only useful on Unix systems.
367 369 """
368 370 if self.has_kernel:
369 371 self.kernel.send_signal(signum)
370 372 else:
371 373 raise RuntimeError("Cannot signal kernel. No kernel is running!")
372 374
373 375 def is_alive(self):
374 376 """Is the kernel process still running?"""
375 377 if self.has_kernel:
376 378 if self.kernel.poll() is None:
377 379 return True
378 380 else:
379 381 return False
380 382 else:
381 383 # we don't have a kernel
382 384 return False
383 385
384 386
385 387 #-----------------------------------------------------------------------------
386 388 # ABC Registration
387 389 #-----------------------------------------------------------------------------
388 390
389 391 KernelManagerABC.register(KernelManager)
390 392
@@ -1,275 +1,283 b''
1 1 """A kernel manager for multiple kernels
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2013 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from __future__ import absolute_import
20 20
21 21 import os
22 22 import uuid
23 23
24 24 import zmq
25 25
26 26 from IPython.config.configurable import LoggingConfigurable
27 27 from IPython.utils.importstring import import_item
28 28 from IPython.utils.traitlets import (
29 29 Instance, Dict, Unicode, Any, DottedObjectName, Bool
30 30 )
31 31
32 32 #-----------------------------------------------------------------------------
33 33 # Classes
34 34 #-----------------------------------------------------------------------------
35 35
36 36 class DuplicateKernelError(Exception):
37 37 pass
38 38
39 39
40 40
41 41 def kernel_method(f):
42 42 """decorator for proxying MKM.method(kernel_id) to individual KMs by ID"""
43 43 def wrapped(self, kernel_id, *args, **kwargs):
44 44 # get the kernel
45 45 km = self.get_kernel(kernel_id)
46 46 method = getattr(km, f.__name__)
47 47 # call the kernel's method
48 48 r = method(*args, **kwargs)
49 49 # last thing, call anything defined in the actual class method
50 50 # such as logging messages
51 51 f(self, kernel_id, *args, **kwargs)
52 52 # return the method result
53 53 return r
54 54 return wrapped
55 55
56 56
57 57 class MultiKernelManager(LoggingConfigurable):
58 58 """A class for managing multiple kernels."""
59 59
60 60 kernel_manager_class = DottedObjectName(
61 61 "IPython.kernel.ioloop.IOLoopKernelManager", config=True,
62 62 help="""The kernel manager class. This is configurable to allow
63 63 subclassing of the KernelManager for customized behavior.
64 64 """
65 65 )
66 66 def _kernel_manager_class_changed(self, name, old, new):
67 67 self.kernel_manager_factory = import_item(new)
68 68
69 69 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
70 70 def _kernel_manager_factory_default(self):
71 71 return import_item(self.kernel_manager_class)
72 72
73 73 context = Instance('zmq.Context')
74 74 def _context_default(self):
75 75 return zmq.Context.instance()
76 76
77 77 connection_dir = Unicode('')
78 78
79 79 _kernels = Dict()
80 80
81 81 def list_kernel_ids(self):
82 82 """Return a list of the kernel ids of the active kernels."""
83 83 # Create a copy so we can iterate over kernels in operations
84 84 # that delete keys.
85 85 return list(self._kernels.keys())
86 86
87 87 def __len__(self):
88 88 """Return the number of running kernels."""
89 89 return len(self.list_kernel_ids())
90 90
91 91 def __contains__(self, kernel_id):
92 92 return kernel_id in self._kernels
93 93
94 94 def start_kernel(self, **kwargs):
95 95 """Start a new kernel.
96 96
97 97 The caller can pick a kernel_id by passing one in as a keyword arg,
98 98 otherwise one will be picked using a uuid.
99 99
100 100 To silence the kernel's stdout/stderr, call this using::
101 101
102 102 km.start_kernel(stdout=PIPE, stderr=PIPE)
103 103
104 104 """
105 105 kernel_id = kwargs.pop('kernel_id', unicode(uuid.uuid4()))
106 106 if kernel_id in self:
107 107 raise DuplicateKernelError('Kernel already exists: %s' % kernel_id)
108 108 # kernel_manager_factory is the constructor for the KernelManager
109 109 # subclass we are using. It can be configured as any Configurable,
110 110 # including things like its transport and ip.
111 111 km = self.kernel_manager_factory(connection_file=os.path.join(
112 112 self.connection_dir, "kernel-%s.json" % kernel_id),
113 113 config=self.config, autorestart=True, log=self.log
114 114 )
115 115 km.start_kernel(**kwargs)
116 116 self._kernels[kernel_id] = km
117 117 return kernel_id
118 118
119 119 @kernel_method
120 120 def shutdown_kernel(self, kernel_id, now=False):
121 121 """Shutdown a kernel by its kernel uuid.
122 122
123 123 Parameters
124 124 ==========
125 125 kernel_id : uuid
126 126 The id of the kernel to shutdown.
127 127 now : bool
128 128 Should the kernel be shutdown forcibly using a signal.
129 129 """
130 130 self.log.info("Kernel shutdown: %s" % kernel_id)
131 131 del self._kernels[kernel_id]
132 132
133 133 def shutdown_all(self, now=False):
134 134 """Shutdown all kernels."""
135 135 for kid in self.list_kernel_ids():
136 136 self.shutdown_kernel(kid, now=now)
137 137
138 138 @kernel_method
139 139 def interrupt_kernel(self, kernel_id):
140 140 """Interrupt (SIGINT) the kernel by its uuid.
141 141
142 142 Parameters
143 143 ==========
144 144 kernel_id : uuid
145 145 The id of the kernel to interrupt.
146 146 """
147 147 self.log.info("Kernel interrupted: %s" % kernel_id)
148 148
149 149 @kernel_method
150 150 def signal_kernel(self, kernel_id, signum):
151 151 """Sends a signal to the kernel by its uuid.
152 152
153 153 Note that since only SIGTERM is supported on Windows, this function
154 154 is only useful on Unix systems.
155 155
156 156 Parameters
157 157 ==========
158 158 kernel_id : uuid
159 159 The id of the kernel to signal.
160 160 """
161 161 self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum))
162 162
163 163 @kernel_method
164 164 def restart_kernel(self, kernel_id):
165 165 """Restart a kernel by its uuid, keeping the same ports.
166 166
167 167 Parameters
168 168 ==========
169 169 kernel_id : uuid
170 170 The id of the kernel to interrupt.
171 171 """
172 172 self.log.info("Kernel restarted: %s" % kernel_id)
173 173
174 174 @kernel_method
175 175 def is_alive(self, kernel_id):
176 176 """Is the kernel alive.
177 177
178 178 This calls KernelManager.is_alive() which calls Popen.poll on the
179 179 actual kernel subprocess.
180 180
181 181 Parameters
182 182 ==========
183 183 kernel_id : uuid
184 184 The id of the kernel.
185 185 """
186 186
187 187 def _check_kernel_id(self, kernel_id):
188 188 """check that a kernel id is valid"""
189 189 if kernel_id not in self:
190 190 raise KeyError("Kernel with id not found: %s" % kernel_id)
191 191
192 192 def get_kernel(self, kernel_id):
193 193 """Get the single KernelManager object for a kernel by its uuid.
194 194
195 195 Parameters
196 196 ==========
197 197 kernel_id : uuid
198 198 The id of the kernel.
199 199 """
200 200 self._check_kernel_id(kernel_id)
201 201 return self._kernels[kernel_id]
202 202
203 203 @kernel_method
204 204 def get_connection_info(self, kernel_id):
205 205 """Return a dictionary of connection data for a kernel.
206 206
207 207 Parameters
208 208 ==========
209 209 kernel_id : uuid
210 210 The id of the kernel.
211 211
212 212 Returns
213 213 =======
214 214 connection_dict : dict
215 215 A dict of the information needed to connect to a kernel.
216 216 This includes the ip address and the integer port
217 217 numbers of the different channels (stdin_port, iopub_port,
218 218 shell_port, hb_port).
219 219 """
220 220
221 221 @kernel_method
222 def connect_iopub(self, kernel_id):
222 def connect_iopub(self, kernel_id, identity=None):
223 223 """Return a zmq Socket connected to the iopub channel.
224 224
225 225 Parameters
226 226 ==========
227 227 kernel_id : uuid
228 The id of the kernel.
228 The id of the kernel
229 identity : bytes (optional)
230 The zmq identity of the socket
229 231
230 232 Returns
231 233 =======
232 234 stream : zmq Socket or ZMQStream
233 235 """
234 236
235 237 @kernel_method
236 def connect_shell(self, kernel_id):
238 def connect_shell(self, kernel_id, identity=None):
237 239 """Return a zmq Socket connected to the shell channel.
238 240
239 241 Parameters
240 242 ==========
241 243 kernel_id : uuid
242 The id of the kernel.
244 The id of the kernel
245 identity : bytes (optional)
246 The zmq identity of the socket
243 247
244 248 Returns
245 249 =======
246 250 stream : zmq Socket or ZMQStream
247 251 """
248 252
249 253 @kernel_method
250 def connect_stdin(self, kernel_id):
254 def connect_stdin(self, kernel_id, identity=None):
251 255 """Return a zmq Socket connected to the stdin channel.
252 256
253 257 Parameters
254 258 ==========
255 259 kernel_id : uuid
256 The id of the kernel.
260 The id of the kernel
261 identity : bytes (optional)
262 The zmq identity of the socket
257 263
258 264 Returns
259 265 =======
260 266 stream : zmq Socket or ZMQStream
261 267 """
262 268
263 269 @kernel_method
264 def connect_hb(self, kernel_id):
270 def connect_hb(self, kernel_id, identity=None):
265 271 """Return a zmq Socket connected to the hb channel.
266 272
267 273 Parameters
268 274 ==========
269 275 kernel_id : uuid
270 The id of the kernel.
276 The id of the kernel
277 identity : bytes (optional)
278 The zmq identity of the socket
271 279
272 280 Returns
273 281 =======
274 282 stream : zmq Socket or ZMQStream
275 283 """
General Comments 0
You need to be logged in to leave comments. Login now