##// END OF EJS Templates
add control channel...
MinRK -
Show More
@@ -46,7 +46,7 b' from IPython.utils.traitlets import ('
46 46 #-----------------------------------------------------------------------------
47 47
48 48 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
49 ip=LOCALHOST, key=b'', transport='tcp'):
49 control_port=0, ip=LOCALHOST, key=b'', transport='tcp'):
50 50 """Generates a JSON config file, including the selection of random ports.
51 51
52 52 Parameters
@@ -62,7 +62,10 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
62 62 The port to use for the SUB channel.
63 63
64 64 stdin_port : int, optional
65 The port to use for the REQ (raw input) channel.
65 The port to use for the ROUTER (raw input) channel.
66
67 control_port : int, optional
68 The port to use for the ROUTER (raw input) channel.
66 69
67 70 hb_port : int, optional
68 71 The port to use for the hearbeat REP channel.
@@ -81,8 +84,11 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
81 84 # Find open ports as necessary.
82 85
83 86 ports = []
84 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
85 int(stdin_port <= 0) + int(hb_port <= 0)
87 ports_needed = int(shell_port <= 0) + \
88 int(iopub_port <= 0) + \
89 int(stdin_port <= 0) + \
90 int(control_port <= 0) + \
91 int(hb_port <= 0)
86 92 if transport == 'tcp':
87 93 for i in range(ports_needed):
88 94 sock = socket.socket()
@@ -105,12 +111,15 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
105 111 iopub_port = ports.pop(0)
106 112 if stdin_port <= 0:
107 113 stdin_port = ports.pop(0)
114 if control_port <= 0:
115 control_port = ports.pop(0)
108 116 if hb_port <= 0:
109 117 hb_port = ports.pop(0)
110 118
111 119 cfg = dict( shell_port=shell_port,
112 120 iopub_port=iopub_port,
113 121 stdin_port=stdin_port,
122 control_port=control_port,
114 123 hb_port=hb_port,
115 124 )
116 125 cfg['ip'] = ip
@@ -346,6 +355,7 b' def tunnel_to_kernel(connection_info, sshserver, sshkey=None):'
346 355 #-----------------------------------------------------------------------------
347 356 # Mixin for classes that workw ith connection files
348 357 #-----------------------------------------------------------------------------
358 port_names = [ "%s_port" % channel for channel in ('shell', 'stdin', 'iopub', 'hb', 'control')]
349 359
350 360 class ConnectionFileMixin(HasTraits):
351 361 """Mixin for configurable classes that work with connection files"""
@@ -381,12 +391,29 b' class ConnectionFileMixin(HasTraits):'
381 391 shell_port = Integer(0)
382 392 iopub_port = Integer(0)
383 393 stdin_port = Integer(0)
394 control_port = Integer(0)
384 395 hb_port = Integer(0)
385 396
397 @property
398 def ports(self):
399 return [ getattr(self, name) for name in port_names ]
400
386 401 #--------------------------------------------------------------------------
387 402 # Connection and ipc file management
388 403 #--------------------------------------------------------------------------
389 404
405 def get_connection_info(self):
406 """return the connection info as a dict"""
407 return dict(
408 transport=self.transport,
409 ip=self.ip,
410 shell_port=self.shell_port,
411 iopub_port=self.iopub_port,
412 stdin_port=self.stdin_port,
413 hb_port=self.hb_port,
414 control_port=self.control_port,
415 )
416
390 417 def cleanup_connection_file(self):
391 418 """Cleanup connection file *if we wrote it*
392 419
@@ -404,7 +431,7 b' class ConnectionFileMixin(HasTraits):'
404 431 """Cleanup ipc files if we wrote them."""
405 432 if self.transport != 'ipc':
406 433 return
407 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
434 for port in self.ports:
408 435 ipcfile = "%s-%i" % (self.ip, port)
409 436 try:
410 437 os.remove(ipcfile)
@@ -415,15 +442,16 b' class ConnectionFileMixin(HasTraits):'
415 442 """Write connection info to JSON dict in self.connection_file."""
416 443 if self._connection_file_written:
417 444 return
418 self.connection_file,cfg = write_connection_file(self.connection_file,
445
446 self.connection_file, cfg = write_connection_file(self.connection_file,
419 447 transport=self.transport, ip=self.ip, key=self.session.key,
420 448 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
421 shell_port=self.shell_port, hb_port=self.hb_port)
449 shell_port=self.shell_port, hb_port=self.hb_port,
450 control_port=self.control_port,
451 )
422 452 # write_connection_file also sets default ports:
423 self.shell_port = cfg['shell_port']
424 self.stdin_port = cfg['stdin_port']
425 self.iopub_port = cfg['iopub_port']
426 self.hb_port = cfg['hb_port']
453 for name in port_names:
454 setattr(self, name, cfg[name])
427 455
428 456 self._connection_file_written = True
429 457
@@ -434,10 +462,8 b' class ConnectionFileMixin(HasTraits):'
434 462
435 463 self.transport = cfg.get('transport', 'tcp')
436 464 self.ip = cfg['ip']
437 self.shell_port = cfg['shell_port']
438 self.stdin_port = cfg['stdin_port']
439 self.iopub_port = cfg['iopub_port']
440 self.hb_port = cfg['hb_port']
465 for name in port_names:
466 setattr(self, name, cfg[name])
441 467 self.session.key = str_to_bytes(cfg['key'])
442 468
443 469
@@ -46,6 +46,7 b' _socket_types = {'
46 46 'shell' : zmq.DEALER,
47 47 'iopub' : zmq.SUB,
48 48 'stdin' : zmq.DEALER,
49 'control': zmq.DEALER,
49 50 }
50 51
51 52 class KernelManager(LoggingConfigurable, ConnectionFileMixin):
@@ -80,13 +81,15 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
80 81 ipython_kernel = Bool(True)
81 82
82 83 # Protected traits
83 _launch_args = Any
84 _launch_args = Any()
85 _control_socket = Any()
84 86
85 87 autorestart = Bool(False, config=True,
86 88 help="""Should we autorestart the kernel if it dies."""
87 89 )
88 90
89 91 def __del__(self):
92 self._close_control_socket()
90 93 self.cleanup_connection_file()
91 94
92 95 #--------------------------------------------------------------------------
@@ -103,17 +106,6 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
103 106 # Connection info
104 107 #--------------------------------------------------------------------------
105 108
106 def get_connection_info(self):
107 """return the connection info as a dict"""
108 return dict(
109 transport=self.transport,
110 ip=self.ip,
111 shell_port=self.shell_port,
112 iopub_port=self.iopub_port,
113 stdin_port=self.stdin_port,
114 hb_port=self.hb_port,
115 )
116
117 109 def _make_url(self, channel):
118 110 """Make a ZeroMQ URL for a given channel."""
119 111 transport = self.transport
@@ -152,6 +144,10 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
152 144 """return zmq Socket connected to the Heartbeat channel"""
153 145 return self._create_connected_socket('hb')
154 146
147 def connect_control(self):
148 """return zmq Socket connected to the Heartbeat channel"""
149 return self._create_connected_socket('control')
150
155 151 #--------------------------------------------------------------------------
156 152 # Kernel management
157 153 #--------------------------------------------------------------------------
@@ -176,6 +172,18 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
176 172 """
177 173 return launch_kernel(kernel_cmd, **kw)
178 174
175 def _connect_control_socket(self):
176 if self._control_socket is None:
177 self._control_socket = self.connect_control()
178
179 def _close_control_socket(self):
180 if self._control_socket is None:
181 return
182 self._control_socket.linger = 100
183 self._control_socket.close()
184 self._control_socket = None
185
186
179 187 def start_kernel(self, **kw):
180 188 """Starts a kernel on this host in a separate process.
181 189
@@ -207,10 +215,13 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
207 215 ipython_kernel=self.ipython_kernel,
208 216 **kw)
209 217 self.start_restarter()
218 self._connect_control_socket()
210 219
211 220 def _send_shutdown_request(self, restart=False):
212 221 """TODO: send a shutdown request via control channel"""
213 raise NotImplementedError("Soft shutdown needs control channel")
222 content = dict(restart=restart)
223 msg = self.session.msg("shutdown_request", content=content)
224 self.session.send(self._control_socket, msg)
214 225
215 226 def shutdown_kernel(self, now=False, restart=False):
216 227 """Attempts to the stop the kernel process cleanly.
@@ -230,7 +241,6 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
230 241 Will this kernel be restarted after it is shutdown. When this
231 242 is True, connection files will not be cleaned up.
232 243 """
233
234 244 # Stop monitoring for restarting while we shutdown.
235 245 self.stop_restarter()
236 246
@@ -239,10 +249,6 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
239 249 self._kill_kernel()
240 250 return
241 251
242 # bypass clean shutdown while
243 # FIXME: add control channel for clean shutdown
244 now = True
245
246 252 if now:
247 253 if self.has_kernel:
248 254 self._kill_kernel()
@@ -250,7 +256,6 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
250 256 # Don't send any additional kernel kill messages immediately, to give
251 257 # the kernel a chance to properly execute shutdown actions. Wait for at
252 258 # most 1s, checking every 0.1s.
253 # FIXME: this method is not yet implemented (need Control channel)
254 259 self._send_shutdown_request(restart=restart)
255 260 for i in range(10):
256 261 if self.is_alive():
@@ -69,6 +69,7 b' kernel_aliases.update({'
69 69 'shell' : 'IPKernelApp.shell_port',
70 70 'iopub' : 'IPKernelApp.iopub_port',
71 71 'stdin' : 'IPKernelApp.stdin_port',
72 'control' : 'IPKernelApp.control_port',
72 73 'f' : 'IPKernelApp.connection_file',
73 74 'parent': 'IPKernelApp.parent',
74 75 'transport': 'IPKernelApp.transport',
@@ -145,7 +146,8 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
145 146 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
146 147 shell_port = Integer(0, config=True, help="set the shell (ROUTER) port [default: random]")
147 148 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
148 stdin_port = Integer(0, config=True, help="set the stdin (DEALER) port [default: random]")
149 stdin_port = Integer(0, config=True, help="set the stdin (ROUTER) port [default: random]")
150 control_port = Integer(0, config=True, help="set the control (ROUTER) port [default: random]")
149 151 connection_file = Unicode('', config=True,
150 152 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
151 153
@@ -227,7 +229,7 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
227 229 if self.ip == self._ip_default() and 'ip' in cfg:
228 230 # not overridden by config or cl_args
229 231 self.ip = cfg['ip']
230 for channel in ('hb', 'shell', 'iopub', 'stdin'):
232 for channel in ('hb', 'shell', 'iopub', 'stdin', 'control'):
231 233 name = channel + '_port'
232 234 if getattr(self, name) == 0 and name in cfg:
233 235 # not overridden by config or cl_args
@@ -241,7 +243,7 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
241 243 self.log.debug("Writing connection file: %s", cf)
242 244 write_connection_file(cf, ip=self.ip, key=self.session.key, transport=self.transport,
243 245 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
244 iopub_port=self.iopub_port)
246 iopub_port=self.iopub_port, control_port=self.control_port)
245 247
246 248 def cleanup_connection_file(self):
247 249 cf = self.abs_connection_file
@@ -257,7 +259,7 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
257 259 """cleanup ipc files if we wrote them"""
258 260 if self.transport != 'ipc':
259 261 return
260 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
262 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port, self.control_port):
261 263 ipcfile = "%s-%i" % (self.ip, port)
262 264 try:
263 265 os.remove(ipcfile)
@@ -282,15 +284,19 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
282 284
283 285 self.shell_socket = context.socket(zmq.ROUTER)
284 286 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
285 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
287 self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port)
286 288
287 289 self.iopub_socket = context.socket(zmq.PUB)
288 290 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
289 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
291 self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port)
290 292
291 293 self.stdin_socket = context.socket(zmq.ROUTER)
292 294 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
293 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
295 self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port)
296
297 self.control_socket = context.socket(zmq.ROUTER)
298 self.control_port = self._bind_socket(self.control_socket, self.control_port)
299 self.log.debug("control ROUTER Channel on port: %i" % self.control_port)
294 300
295 301 def init_heartbeat(self):
296 302 """start the heart beating"""
@@ -299,7 +305,7 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
299 305 hb_ctx = zmq.Context()
300 306 self.heartbeat = Heartbeat(hb_ctx, (self.transport, self.ip, self.hb_port))
301 307 self.hb_port = self.heartbeat.port
302 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
308 self.log.debug("Heartbeat REP Channel on port: %i" % self.hb_port)
303 309 self.heartbeat.start()
304 310
305 311 # Helper to make it easier to connect to an existing kernel.
@@ -321,7 +327,8 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
321 327
322 328
323 329 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
324 stdin=self.stdin_port, hb=self.hb_port)
330 stdin=self.stdin_port, hb=self.hb_port,
331 control=self.control_port)
325 332
326 333 def init_session(self):
327 334 """create our session object"""
@@ -353,11 +360,12 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
353 360 def init_kernel(self):
354 361 """Create the Kernel object itself"""
355 362 shell_stream = ZMQStream(self.shell_socket)
363 control_stream = ZMQStream(self.control_socket)
356 364
357 365 kernel_factory = import_item(str(self.kernel_class))
358 366
359 367 kernel = kernel_factory(config=self.config, session=self.session,
360 shell_streams=[shell_stream],
368 shell_streams=[shell_stream, control_stream],
361 369 iopub_socket=self.iopub_socket,
362 370 stdin_socket=self.stdin_socket,
363 371 log=self.log,
General Comments 0
You need to be logged in to leave comments. Login now