##// END OF EJS Templates
add control channel...
MinRK -
Show More
@@ -46,7 +46,7 b' from IPython.utils.traitlets import ('
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47
47
48 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
48 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
49 ip=LOCALHOST, key=b'', transport='tcp'):
49 control_port=0, ip=LOCALHOST, key=b'', transport='tcp'):
50 """Generates a JSON config file, including the selection of random ports.
50 """Generates a JSON config file, including the selection of random ports.
51
51
52 Parameters
52 Parameters
@@ -62,7 +62,10 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
62 The port to use for the SUB channel.
62 The port to use for the SUB channel.
63
63
64 stdin_port : int, optional
64 stdin_port : int, optional
65 The port to use for the REQ (raw input) channel.
65 The port to use for the ROUTER (raw input) channel.
66
67 control_port : int, optional
68 The port to use for the ROUTER (raw input) channel.
66
69
67 hb_port : int, optional
70 hb_port : int, optional
68 The port to use for the hearbeat REP channel.
71 The port to use for the hearbeat REP channel.
@@ -81,8 +84,11 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
81 # Find open ports as necessary.
84 # Find open ports as necessary.
82
85
83 ports = []
86 ports = []
84 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
87 ports_needed = int(shell_port <= 0) + \
85 int(stdin_port <= 0) + int(hb_port <= 0)
88 int(iopub_port <= 0) + \
89 int(stdin_port <= 0) + \
90 int(control_port <= 0) + \
91 int(hb_port <= 0)
86 if transport == 'tcp':
92 if transport == 'tcp':
87 for i in range(ports_needed):
93 for i in range(ports_needed):
88 sock = socket.socket()
94 sock = socket.socket()
@@ -105,12 +111,15 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
105 iopub_port = ports.pop(0)
111 iopub_port = ports.pop(0)
106 if stdin_port <= 0:
112 if stdin_port <= 0:
107 stdin_port = ports.pop(0)
113 stdin_port = ports.pop(0)
114 if control_port <= 0:
115 control_port = ports.pop(0)
108 if hb_port <= 0:
116 if hb_port <= 0:
109 hb_port = ports.pop(0)
117 hb_port = ports.pop(0)
110
118
111 cfg = dict( shell_port=shell_port,
119 cfg = dict( shell_port=shell_port,
112 iopub_port=iopub_port,
120 iopub_port=iopub_port,
113 stdin_port=stdin_port,
121 stdin_port=stdin_port,
122 control_port=control_port,
114 hb_port=hb_port,
123 hb_port=hb_port,
115 )
124 )
116 cfg['ip'] = ip
125 cfg['ip'] = ip
@@ -346,6 +355,7 b' def tunnel_to_kernel(connection_info, sshserver, sshkey=None):'
346 #-----------------------------------------------------------------------------
355 #-----------------------------------------------------------------------------
347 # Mixin for classes that workw ith connection files
356 # Mixin for classes that workw ith connection files
348 #-----------------------------------------------------------------------------
357 #-----------------------------------------------------------------------------
358 port_names = [ "%s_port" % channel for channel in ('shell', 'stdin', 'iopub', 'hb', 'control')]
349
359
350 class ConnectionFileMixin(HasTraits):
360 class ConnectionFileMixin(HasTraits):
351 """Mixin for configurable classes that work with connection files"""
361 """Mixin for configurable classes that work with connection files"""
@@ -381,12 +391,29 b' class ConnectionFileMixin(HasTraits):'
381 shell_port = Integer(0)
391 shell_port = Integer(0)
382 iopub_port = Integer(0)
392 iopub_port = Integer(0)
383 stdin_port = Integer(0)
393 stdin_port = Integer(0)
394 control_port = Integer(0)
384 hb_port = Integer(0)
395 hb_port = Integer(0)
385
396
397 @property
398 def ports(self):
399 return [ getattr(self, name) for name in port_names ]
400
386 #--------------------------------------------------------------------------
401 #--------------------------------------------------------------------------
387 # Connection and ipc file management
402 # Connection and ipc file management
388 #--------------------------------------------------------------------------
403 #--------------------------------------------------------------------------
389
404
405 def get_connection_info(self):
406 """return the connection info as a dict"""
407 return dict(
408 transport=self.transport,
409 ip=self.ip,
410 shell_port=self.shell_port,
411 iopub_port=self.iopub_port,
412 stdin_port=self.stdin_port,
413 hb_port=self.hb_port,
414 control_port=self.control_port,
415 )
416
390 def cleanup_connection_file(self):
417 def cleanup_connection_file(self):
391 """Cleanup connection file *if we wrote it*
418 """Cleanup connection file *if we wrote it*
392
419
@@ -404,7 +431,7 b' class ConnectionFileMixin(HasTraits):'
404 """Cleanup ipc files if we wrote them."""
431 """Cleanup ipc files if we wrote them."""
405 if self.transport != 'ipc':
432 if self.transport != 'ipc':
406 return
433 return
407 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
434 for port in self.ports:
408 ipcfile = "%s-%i" % (self.ip, port)
435 ipcfile = "%s-%i" % (self.ip, port)
409 try:
436 try:
410 os.remove(ipcfile)
437 os.remove(ipcfile)
@@ -415,15 +442,16 b' class ConnectionFileMixin(HasTraits):'
415 """Write connection info to JSON dict in self.connection_file."""
442 """Write connection info to JSON dict in self.connection_file."""
416 if self._connection_file_written:
443 if self._connection_file_written:
417 return
444 return
445
418 self.connection_file,cfg = write_connection_file(self.connection_file,
446 self.connection_file, cfg = write_connection_file(self.connection_file,
419 transport=self.transport, ip=self.ip, key=self.session.key,
447 transport=self.transport, ip=self.ip, key=self.session.key,
420 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
448 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
421 shell_port=self.shell_port, hb_port=self.hb_port)
449 shell_port=self.shell_port, hb_port=self.hb_port,
450 control_port=self.control_port,
451 )
422 # write_connection_file also sets default ports:
452 # write_connection_file also sets default ports:
423 self.shell_port = cfg['shell_port']
453 for name in port_names:
424 self.stdin_port = cfg['stdin_port']
454 setattr(self, name, cfg[name])
425 self.iopub_port = cfg['iopub_port']
426 self.hb_port = cfg['hb_port']
427
455
428 self._connection_file_written = True
456 self._connection_file_written = True
429
457
@@ -434,10 +462,8 b' class ConnectionFileMixin(HasTraits):'
434
462
435 self.transport = cfg.get('transport', 'tcp')
463 self.transport = cfg.get('transport', 'tcp')
436 self.ip = cfg['ip']
464 self.ip = cfg['ip']
437 self.shell_port = cfg['shell_port']
465 for name in port_names:
438 self.stdin_port = cfg['stdin_port']
466 setattr(self, name, cfg[name])
439 self.iopub_port = cfg['iopub_port']
440 self.hb_port = cfg['hb_port']
441 self.session.key = str_to_bytes(cfg['key'])
467 self.session.key = str_to_bytes(cfg['key'])
442
468
443
469
@@ -46,6 +46,7 b' _socket_types = {'
46 'shell' : zmq.DEALER,
46 'shell' : zmq.DEALER,
47 'iopub' : zmq.SUB,
47 'iopub' : zmq.SUB,
48 'stdin' : zmq.DEALER,
48 'stdin' : zmq.DEALER,
49 'control': zmq.DEALER,
49 }
50 }
50
51
51 class KernelManager(LoggingConfigurable, ConnectionFileMixin):
52 class KernelManager(LoggingConfigurable, ConnectionFileMixin):
@@ -80,13 +81,15 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
80 ipython_kernel = Bool(True)
81 ipython_kernel = Bool(True)
81
82
82 # Protected traits
83 # Protected traits
83 _launch_args = Any
84 _launch_args = Any()
85 _control_socket = Any()
84
86
85 autorestart = Bool(False, config=True,
87 autorestart = Bool(False, config=True,
86 help="""Should we autorestart the kernel if it dies."""
88 help="""Should we autorestart the kernel if it dies."""
87 )
89 )
88
90
89 def __del__(self):
91 def __del__(self):
92 self._close_control_socket()
90 self.cleanup_connection_file()
93 self.cleanup_connection_file()
91
94
92 #--------------------------------------------------------------------------
95 #--------------------------------------------------------------------------
@@ -103,17 +106,6 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
103 # Connection info
106 # Connection info
104 #--------------------------------------------------------------------------
107 #--------------------------------------------------------------------------
105
108
106 def get_connection_info(self):
107 """return the connection info as a dict"""
108 return dict(
109 transport=self.transport,
110 ip=self.ip,
111 shell_port=self.shell_port,
112 iopub_port=self.iopub_port,
113 stdin_port=self.stdin_port,
114 hb_port=self.hb_port,
115 )
116
117 def _make_url(self, channel):
109 def _make_url(self, channel):
118 """Make a ZeroMQ URL for a given channel."""
110 """Make a ZeroMQ URL for a given channel."""
119 transport = self.transport
111 transport = self.transport
@@ -152,6 +144,10 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
152 """return zmq Socket connected to the Heartbeat channel"""
144 """return zmq Socket connected to the Heartbeat channel"""
153 return self._create_connected_socket('hb')
145 return self._create_connected_socket('hb')
154
146
147 def connect_control(self):
148 """return zmq Socket connected to the Heartbeat channel"""
149 return self._create_connected_socket('control')
150
155 #--------------------------------------------------------------------------
151 #--------------------------------------------------------------------------
156 # Kernel management
152 # Kernel management
157 #--------------------------------------------------------------------------
153 #--------------------------------------------------------------------------
@@ -176,6 +172,18 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
176 """
172 """
177 return launch_kernel(kernel_cmd, **kw)
173 return launch_kernel(kernel_cmd, **kw)
178
174
175 def _connect_control_socket(self):
176 if self._control_socket is None:
177 self._control_socket = self.connect_control()
178
179 def _close_control_socket(self):
180 if self._control_socket is None:
181 return
182 self._control_socket.linger = 100
183 self._control_socket.close()
184 self._control_socket = None
185
186
179 def start_kernel(self, **kw):
187 def start_kernel(self, **kw):
180 """Starts a kernel on this host in a separate process.
188 """Starts a kernel on this host in a separate process.
181
189
@@ -207,10 +215,13 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
207 ipython_kernel=self.ipython_kernel,
215 ipython_kernel=self.ipython_kernel,
208 **kw)
216 **kw)
209 self.start_restarter()
217 self.start_restarter()
218 self._connect_control_socket()
210
219
211 def _send_shutdown_request(self, restart=False):
220 def _send_shutdown_request(self, restart=False):
212 """TODO: send a shutdown request via control channel"""
221 """TODO: send a shutdown request via control channel"""
213 raise NotImplementedError("Soft shutdown needs control channel")
222 content = dict(restart=restart)
223 msg = self.session.msg("shutdown_request", content=content)
224 self.session.send(self._control_socket, msg)
214
225
215 def shutdown_kernel(self, now=False, restart=False):
226 def shutdown_kernel(self, now=False, restart=False):
216 """Attempts to the stop the kernel process cleanly.
227 """Attempts to the stop the kernel process cleanly.
@@ -230,7 +241,6 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
230 Will this kernel be restarted after it is shutdown. When this
241 Will this kernel be restarted after it is shutdown. When this
231 is True, connection files will not be cleaned up.
242 is True, connection files will not be cleaned up.
232 """
243 """
233
234 # Stop monitoring for restarting while we shutdown.
244 # Stop monitoring for restarting while we shutdown.
235 self.stop_restarter()
245 self.stop_restarter()
236
246
@@ -239,10 +249,6 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
239 self._kill_kernel()
249 self._kill_kernel()
240 return
250 return
241
251
242 # bypass clean shutdown while
243 # FIXME: add control channel for clean shutdown
244 now = True
245
246 if now:
252 if now:
247 if self.has_kernel:
253 if self.has_kernel:
248 self._kill_kernel()
254 self._kill_kernel()
@@ -250,7 +256,6 b' class KernelManager(LoggingConfigurable, ConnectionFileMixin):'
250 # Don't send any additional kernel kill messages immediately, to give
256 # Don't send any additional kernel kill messages immediately, to give
251 # the kernel a chance to properly execute shutdown actions. Wait for at
257 # the kernel a chance to properly execute shutdown actions. Wait for at
252 # most 1s, checking every 0.1s.
258 # most 1s, checking every 0.1s.
253 # FIXME: this method is not yet implemented (need Control channel)
254 self._send_shutdown_request(restart=restart)
259 self._send_shutdown_request(restart=restart)
255 for i in range(10):
260 for i in range(10):
256 if self.is_alive():
261 if self.is_alive():
@@ -69,6 +69,7 b' kernel_aliases.update({'
69 'shell' : 'IPKernelApp.shell_port',
69 'shell' : 'IPKernelApp.shell_port',
70 'iopub' : 'IPKernelApp.iopub_port',
70 'iopub' : 'IPKernelApp.iopub_port',
71 'stdin' : 'IPKernelApp.stdin_port',
71 'stdin' : 'IPKernelApp.stdin_port',
72 'control' : 'IPKernelApp.control_port',
72 'f' : 'IPKernelApp.connection_file',
73 'f' : 'IPKernelApp.connection_file',
73 'parent': 'IPKernelApp.parent',
74 'parent': 'IPKernelApp.parent',
74 'transport': 'IPKernelApp.transport',
75 'transport': 'IPKernelApp.transport',
@@ -145,7 +146,8 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
145 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
146 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
146 shell_port = Integer(0, config=True, help="set the shell (ROUTER) port [default: random]")
147 shell_port = Integer(0, config=True, help="set the shell (ROUTER) port [default: random]")
147 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
148 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
148 stdin_port = Integer(0, config=True, help="set the stdin (DEALER) port [default: random]")
149 stdin_port = Integer(0, config=True, help="set the stdin (ROUTER) port [default: random]")
150 control_port = Integer(0, config=True, help="set the control (ROUTER) port [default: random]")
149 connection_file = Unicode('', config=True,
151 connection_file = Unicode('', config=True,
150 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
152 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
151
153
@@ -227,7 +229,7 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
227 if self.ip == self._ip_default() and 'ip' in cfg:
229 if self.ip == self._ip_default() and 'ip' in cfg:
228 # not overridden by config or cl_args
230 # not overridden by config or cl_args
229 self.ip = cfg['ip']
231 self.ip = cfg['ip']
230 for channel in ('hb', 'shell', 'iopub', 'stdin'):
232 for channel in ('hb', 'shell', 'iopub', 'stdin', 'control'):
231 name = channel + '_port'
233 name = channel + '_port'
232 if getattr(self, name) == 0 and name in cfg:
234 if getattr(self, name) == 0 and name in cfg:
233 # not overridden by config or cl_args
235 # not overridden by config or cl_args
@@ -241,7 +243,7 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
241 self.log.debug("Writing connection file: %s", cf)
243 self.log.debug("Writing connection file: %s", cf)
242 write_connection_file(cf, ip=self.ip, key=self.session.key, transport=self.transport,
244 write_connection_file(cf, ip=self.ip, key=self.session.key, transport=self.transport,
243 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
245 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
244 iopub_port=self.iopub_port)
246 iopub_port=self.iopub_port, control_port=self.control_port)
245
247
246 def cleanup_connection_file(self):
248 def cleanup_connection_file(self):
247 cf = self.abs_connection_file
249 cf = self.abs_connection_file
@@ -257,7 +259,7 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
257 """cleanup ipc files if we wrote them"""
259 """cleanup ipc files if we wrote them"""
258 if self.transport != 'ipc':
260 if self.transport != 'ipc':
259 return
261 return
260 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
262 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port, self.control_port):
261 ipcfile = "%s-%i" % (self.ip, port)
263 ipcfile = "%s-%i" % (self.ip, port)
262 try:
264 try:
263 os.remove(ipcfile)
265 os.remove(ipcfile)
@@ -292,6 +294,10 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
292 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
294 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
293 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
295 self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port)
294
296
297 self.control_socket = context.socket(zmq.ROUTER)
298 self.control_port = self._bind_socket(self.control_socket, self.control_port)
299 self.log.debug("control ROUTER Channel on port: %i" % self.control_port)
300
295 def init_heartbeat(self):
301 def init_heartbeat(self):
296 """start the heart beating"""
302 """start the heart beating"""
297 # heartbeat doesn't share context, because it mustn't be blocked
303 # heartbeat doesn't share context, because it mustn't be blocked
@@ -321,7 +327,8 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
321
327
322
328
323 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
329 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
324 stdin=self.stdin_port, hb=self.hb_port)
330 stdin=self.stdin_port, hb=self.hb_port,
331 control=self.control_port)
325
332
326 def init_session(self):
333 def init_session(self):
327 """create our session object"""
334 """create our session object"""
@@ -353,11 +360,12 b' class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):'
353 def init_kernel(self):
360 def init_kernel(self):
354 """Create the Kernel object itself"""
361 """Create the Kernel object itself"""
355 shell_stream = ZMQStream(self.shell_socket)
362 shell_stream = ZMQStream(self.shell_socket)
363 control_stream = ZMQStream(self.control_socket)
356
364
357 kernel_factory = import_item(str(self.kernel_class))
365 kernel_factory = import_item(str(self.kernel_class))
358
366
359 kernel = kernel_factory(config=self.config, session=self.session,
367 kernel = kernel_factory(config=self.config, session=self.session,
360 shell_streams=[shell_stream],
368 shell_streams=[shell_stream, control_stream],
361 iopub_socket=self.iopub_socket,
369 iopub_socket=self.iopub_socket,
362 stdin_socket=self.stdin_socket,
370 stdin_socket=self.stdin_socket,
363 log=self.log,
371 log=self.log,
General Comments 0
You need to be logged in to leave comments. Login now