##// END OF EJS Templates
Merge pull request #1262 from minrk/hbgil...
Min RK -
r5887:0d05c58e merge
parent child Browse files
Show More
@@ -0,0 +1,56 b''
1 {
2 "metadata": {
3 "name": "gilsleep"
4 },
5 "nbformat": 2,
6 "worksheets": [
7 {
8 "cells": [
9 {
10 "cell_type": "markdown",
11 "source": [
12 "Holding the GIL for too long could disrupt the heartbeat due to non-copying sends.",
13 "",
14 "The following cell repeatedly calls a function that holds the GIL for five seconds.",
15 "",
16 "The heartbeat will fail after a few iterations prior to fixing Issue [#1260](https://github.com/ipython/ipython/issues/1260)."
17 ]
18 },
19 {
20 "cell_type": "code",
21 "collapsed": false,
22 "input": [
23 "import sys",
24 "import time",
25 "",
26 "from cython import inline",
27 "",
28 "def gilsleep(t):",
29 " \"\"\"gil-holding sleep with cython.inline\"\"\"",
30 " code = '\\n'.join([",
31 " 'from posix cimport unistd',",
32 " 'unistd.sleep(t)',",
33 " ])",
34 " while True:",
35 " inline(code, quiet=True, t=t)",
36 " print time.time()",
37 " sys.stdout.flush() # this is important",
38 "",
39 "gilsleep(5)"
40 ],
41 "language": "python",
42 "outputs": [],
43 "prompt_number": 1
44 },
45 {
46 "cell_type": "code",
47 "collapsed": true,
48 "input": [],
49 "language": "python",
50 "outputs": [],
51 "prompt_number": " "
52 }
53 ]
54 }
55 ]
56 } No newline at end of file
@@ -0,0 +1,31 b''
1 """
2 Run this script in the qtconsole with one of:
3
4 %loadpy hb_gil.py
5
6 or
7 %run hb_gil.py
8
9 Holding the GIL for too long could disrupt the heartbeat.
10
11 See Issue #1260: https://github.com/ipython/ipython/issues/1260
12
13 """
14
15 import sys
16 import time
17
18 from cython import inline
19
20 def gilsleep(t):
21 """gil-holding sleep with cython.inline"""
22 code = '\n'.join([
23 'from posix cimport unistd',
24 'unistd.sleep(t)',
25 ])
26 while True:
27 inline(code, quiet=True, t=t)
28 print time.time()
29 sys.stdout.flush() # this is important
30
31 gilsleep(5)
@@ -1,174 +1,178 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010-2011 The IPython Development Team
11 # Copyright (C) 2010-2011 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import print_function
17 from __future__ import print_function
18 import time
18 import time
19 import uuid
19 import uuid
20
20
21 import zmq
21 import zmq
22 from zmq.devices import ThreadDevice
22 from zmq.devices import ThreadDevice
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat
26 from IPython.utils.traitlets import Set, Instance, CFloat
27
27
28 from IPython.parallel.util import asbytes
28 from IPython.parallel.util import asbytes
29
29
30 class Heart(object):
30 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
31 """A basic heart object for responding to a HeartMonitor.
32 This is a simple wrapper with defaults for the most common
32 This is a simple wrapper with defaults for the most common
33 Device model for responding to heartbeats.
33 Device model for responding to heartbeats.
34
34
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 SUB/XREQ for in/out.
36 SUB/XREQ for in/out.
37
37
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 device=None
39 device=None
40 id=None
40 id=None
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 # do not allow the device to share global Context.instance,
44 # which is the default behavior in pyzmq > 2.1.10
45 self.device.context_factory = zmq.Context
46
43 self.device.daemon=True
47 self.device.daemon=True
44 self.device.connect_in(in_addr)
48 self.device.connect_in(in_addr)
45 self.device.connect_out(out_addr)
49 self.device.connect_out(out_addr)
46 if in_type == zmq.SUB:
50 if in_type == zmq.SUB:
47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
48 if heart_id is None:
52 if heart_id is None:
49 heart_id = uuid.uuid4().bytes
53 heart_id = uuid.uuid4().bytes
50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
51 self.id = heart_id
55 self.id = heart_id
52
56
53 def start(self):
57 def start(self):
54 return self.device.start()
58 return self.device.start()
55
59
56
60
57 class HeartMonitor(LoggingConfigurable):
61 class HeartMonitor(LoggingConfigurable):
58 """A basic HeartMonitor class
62 """A basic HeartMonitor class
59 pingstream: a PUB stream
63 pingstream: a PUB stream
60 pongstream: an XREP stream
64 pongstream: an XREP stream
61 period: the period of the heartbeat in milliseconds"""
65 period: the period of the heartbeat in milliseconds"""
62
66
63 period=CFloat(1000, config=True,
67 period=CFloat(1000, config=True,
64 help='The frequency at which the Hub pings the engines for heartbeats '
68 help='The frequency at which the Hub pings the engines for heartbeats '
65 '(in ms)',
69 '(in ms)',
66 )
70 )
67
71
68 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
69 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
70 loop = Instance('zmq.eventloop.ioloop.IOLoop')
74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
71 def _loop_default(self):
75 def _loop_default(self):
72 return ioloop.IOLoop.instance()
76 return ioloop.IOLoop.instance()
73
77
74 # not settable:
78 # not settable:
75 hearts=Set()
79 hearts=Set()
76 responses=Set()
80 responses=Set()
77 on_probation=Set()
81 on_probation=Set()
78 last_ping=CFloat(0)
82 last_ping=CFloat(0)
79 _new_handlers = Set()
83 _new_handlers = Set()
80 _failure_handlers = Set()
84 _failure_handlers = Set()
81 lifetime = CFloat(0)
85 lifetime = CFloat(0)
82 tic = CFloat(0)
86 tic = CFloat(0)
83
87
84 def __init__(self, **kwargs):
88 def __init__(self, **kwargs):
85 super(HeartMonitor, self).__init__(**kwargs)
89 super(HeartMonitor, self).__init__(**kwargs)
86
90
87 self.pongstream.on_recv(self.handle_pong)
91 self.pongstream.on_recv(self.handle_pong)
88
92
89 def start(self):
93 def start(self):
90 self.tic = time.time()
94 self.tic = time.time()
91 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
92 self.caller.start()
96 self.caller.start()
93
97
94 def add_new_heart_handler(self, handler):
98 def add_new_heart_handler(self, handler):
95 """add a new handler for new hearts"""
99 """add a new handler for new hearts"""
96 self.log.debug("heartbeat::new_heart_handler: %s", handler)
100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
97 self._new_handlers.add(handler)
101 self._new_handlers.add(handler)
98
102
99 def add_heart_failure_handler(self, handler):
103 def add_heart_failure_handler(self, handler):
100 """add a new handler for heart failure"""
104 """add a new handler for heart failure"""
101 self.log.debug("heartbeat::new heart failure handler: %s", handler)
105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
102 self._failure_handlers.add(handler)
106 self._failure_handlers.add(handler)
103
107
104 def beat(self):
108 def beat(self):
105 self.pongstream.flush()
109 self.pongstream.flush()
106 self.last_ping = self.lifetime
110 self.last_ping = self.lifetime
107
111
108 toc = time.time()
112 toc = time.time()
109 self.lifetime += toc-self.tic
113 self.lifetime += toc-self.tic
110 self.tic = toc
114 self.tic = toc
111 self.log.debug("heartbeat::sending %s", self.lifetime)
115 self.log.debug("heartbeat::sending %s", self.lifetime)
112 goodhearts = self.hearts.intersection(self.responses)
116 goodhearts = self.hearts.intersection(self.responses)
113 missed_beats = self.hearts.difference(goodhearts)
117 missed_beats = self.hearts.difference(goodhearts)
114 heartfailures = self.on_probation.intersection(missed_beats)
118 heartfailures = self.on_probation.intersection(missed_beats)
115 newhearts = self.responses.difference(goodhearts)
119 newhearts = self.responses.difference(goodhearts)
116 map(self.handle_new_heart, newhearts)
120 map(self.handle_new_heart, newhearts)
117 map(self.handle_heart_failure, heartfailures)
121 map(self.handle_heart_failure, heartfailures)
118 self.on_probation = missed_beats.intersection(self.hearts)
122 self.on_probation = missed_beats.intersection(self.hearts)
119 self.responses = set()
123 self.responses = set()
120 # print self.on_probation, self.hearts
124 # print self.on_probation, self.hearts
121 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
122 self.pingstream.send(asbytes(str(self.lifetime)))
126 self.pingstream.send(asbytes(str(self.lifetime)))
123
127
124 def handle_new_heart(self, heart):
128 def handle_new_heart(self, heart):
125 if self._new_handlers:
129 if self._new_handlers:
126 for handler in self._new_handlers:
130 for handler in self._new_handlers:
127 handler(heart)
131 handler(heart)
128 else:
132 else:
129 self.log.info("heartbeat::yay, got new heart %s!", heart)
133 self.log.info("heartbeat::yay, got new heart %s!", heart)
130 self.hearts.add(heart)
134 self.hearts.add(heart)
131
135
132 def handle_heart_failure(self, heart):
136 def handle_heart_failure(self, heart):
133 if self._failure_handlers:
137 if self._failure_handlers:
134 for handler in self._failure_handlers:
138 for handler in self._failure_handlers:
135 try:
139 try:
136 handler(heart)
140 handler(heart)
137 except Exception as e:
141 except Exception as e:
138 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
142 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
139 pass
143 pass
140 else:
144 else:
141 self.log.info("heartbeat::Heart %s failed :(", heart)
145 self.log.info("heartbeat::Heart %s failed :(", heart)
142 self.hearts.remove(heart)
146 self.hearts.remove(heart)
143
147
144
148
145 def handle_pong(self, msg):
149 def handle_pong(self, msg):
146 "a heart just beat"
150 "a heart just beat"
147 current = asbytes(str(self.lifetime))
151 current = asbytes(str(self.lifetime))
148 last = asbytes(str(self.last_ping))
152 last = asbytes(str(self.last_ping))
149 if msg[1] == current:
153 if msg[1] == current:
150 delta = time.time()-self.tic
154 delta = time.time()-self.tic
151 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
155 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
152 self.responses.add(msg[0])
156 self.responses.add(msg[0])
153 elif msg[1] == last:
157 elif msg[1] == last:
154 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
158 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
155 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
159 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
156 self.responses.add(msg[0])
160 self.responses.add(msg[0])
157 else:
161 else:
158 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
162 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
159
163
160
164
161 if __name__ == '__main__':
165 if __name__ == '__main__':
162 loop = ioloop.IOLoop.instance()
166 loop = ioloop.IOLoop.instance()
163 context = zmq.Context()
167 context = zmq.Context()
164 pub = context.socket(zmq.PUB)
168 pub = context.socket(zmq.PUB)
165 pub.bind('tcp://127.0.0.1:5555')
169 pub.bind('tcp://127.0.0.1:5555')
166 xrep = context.socket(zmq.ROUTER)
170 xrep = context.socket(zmq.ROUTER)
167 xrep.bind('tcp://127.0.0.1:5556')
171 xrep.bind('tcp://127.0.0.1:5556')
168
172
169 outstream = zmqstream.ZMQStream(pub, loop)
173 outstream = zmqstream.ZMQStream(pub, loop)
170 instream = zmqstream.ZMQStream(xrep, loop)
174 instream = zmqstream.ZMQStream(xrep, loop)
171
175
172 hb = HeartMonitor(loop, outstream, instream)
176 hb = HeartMonitor(loop, outstream, instream)
173
177
174 loop.start()
178 loop.start()
@@ -1,303 +1,306 b''
1 """An Application for launching a kernel
1 """An Application for launching a kernel
2
2
3 Authors
3 Authors
4 -------
4 -------
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING.txt, distributed as part of this software.
11 # the file COPYING.txt, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import json
19 import json
20 import os
20 import os
21 import sys
21 import sys
22
22
23 # System library imports.
23 # System library imports.
24 import zmq
24 import zmq
25
25
26 # IPython imports.
26 # IPython imports.
27 from IPython.core.ultratb import FormattedTB
27 from IPython.core.ultratb import FormattedTB
28 from IPython.core.application import (
28 from IPython.core.application import (
29 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
29 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
30 )
30 )
31 from IPython.utils import io
31 from IPython.utils import io
32 from IPython.utils.localinterfaces import LOCALHOST
32 from IPython.utils.localinterfaces import LOCALHOST
33 from IPython.utils.path import filefind
33 from IPython.utils.path import filefind
34 from IPython.utils.py3compat import str_to_bytes
34 from IPython.utils.py3compat import str_to_bytes
35 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
35 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
36 DottedObjectName)
36 DottedObjectName)
37 from IPython.utils.importstring import import_item
37 from IPython.utils.importstring import import_item
38 # local imports
38 # local imports
39 from IPython.zmq.entry_point import write_connection_file
39 from IPython.zmq.entry_point import write_connection_file
40 from IPython.zmq.heartbeat import Heartbeat
40 from IPython.zmq.heartbeat import Heartbeat
41 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
41 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
42 from IPython.zmq.session import (
42 from IPython.zmq.session import (
43 Session, session_flags, session_aliases, default_secure,
43 Session, session_flags, session_aliases, default_secure,
44 )
44 )
45
45
46
46
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48 # Flags and Aliases
48 # Flags and Aliases
49 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
50
50
51 kernel_aliases = dict(base_aliases)
51 kernel_aliases = dict(base_aliases)
52 kernel_aliases.update({
52 kernel_aliases.update({
53 'ip' : 'KernelApp.ip',
53 'ip' : 'KernelApp.ip',
54 'hb' : 'KernelApp.hb_port',
54 'hb' : 'KernelApp.hb_port',
55 'shell' : 'KernelApp.shell_port',
55 'shell' : 'KernelApp.shell_port',
56 'iopub' : 'KernelApp.iopub_port',
56 'iopub' : 'KernelApp.iopub_port',
57 'stdin' : 'KernelApp.stdin_port',
57 'stdin' : 'KernelApp.stdin_port',
58 'f' : 'KernelApp.connection_file',
58 'f' : 'KernelApp.connection_file',
59 'parent': 'KernelApp.parent',
59 'parent': 'KernelApp.parent',
60 })
60 })
61 if sys.platform.startswith('win'):
61 if sys.platform.startswith('win'):
62 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
62 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
63
63
64 kernel_flags = dict(base_flags)
64 kernel_flags = dict(base_flags)
65 kernel_flags.update({
65 kernel_flags.update({
66 'no-stdout' : (
66 'no-stdout' : (
67 {'KernelApp' : {'no_stdout' : True}},
67 {'KernelApp' : {'no_stdout' : True}},
68 "redirect stdout to the null device"),
68 "redirect stdout to the null device"),
69 'no-stderr' : (
69 'no-stderr' : (
70 {'KernelApp' : {'no_stderr' : True}},
70 {'KernelApp' : {'no_stderr' : True}},
71 "redirect stderr to the null device"),
71 "redirect stderr to the null device"),
72 })
72 })
73
73
74 # inherit flags&aliases for Sessions
74 # inherit flags&aliases for Sessions
75 kernel_aliases.update(session_aliases)
75 kernel_aliases.update(session_aliases)
76 kernel_flags.update(session_flags)
76 kernel_flags.update(session_flags)
77
77
78
78
79
79
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81 # Application class for starting a Kernel
81 # Application class for starting a Kernel
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83
83
84 class KernelApp(BaseIPythonApplication):
84 class KernelApp(BaseIPythonApplication):
85 name='pykernel'
85 name='pykernel'
86 aliases = Dict(kernel_aliases)
86 aliases = Dict(kernel_aliases)
87 flags = Dict(kernel_flags)
87 flags = Dict(kernel_flags)
88 classes = [Session]
88 classes = [Session]
89 # the kernel class, as an importstring
89 # the kernel class, as an importstring
90 kernel_class = DottedObjectName('IPython.zmq.pykernel.Kernel')
90 kernel_class = DottedObjectName('IPython.zmq.pykernel.Kernel')
91 kernel = Any()
91 kernel = Any()
92 poller = Any() # don't restrict this even though current pollers are all Threads
92 poller = Any() # don't restrict this even though current pollers are all Threads
93 heartbeat = Instance(Heartbeat)
93 heartbeat = Instance(Heartbeat)
94 session = Instance('IPython.zmq.session.Session')
94 session = Instance('IPython.zmq.session.Session')
95 ports = Dict()
95 ports = Dict()
96
96
97 # inherit config file name from parent:
97 # inherit config file name from parent:
98 parent_appname = Unicode(config=True)
98 parent_appname = Unicode(config=True)
99 def _parent_appname_changed(self, name, old, new):
99 def _parent_appname_changed(self, name, old, new):
100 if self.config_file_specified:
100 if self.config_file_specified:
101 # it was manually specified, ignore
101 # it was manually specified, ignore
102 return
102 return
103 self.config_file_name = new.replace('-','_') + u'_config.py'
103 self.config_file_name = new.replace('-','_') + u'_config.py'
104 # don't let this count as specifying the config file
104 # don't let this count as specifying the config file
105 self.config_file_specified = False
105 self.config_file_specified = False
106
106
107 # connection info:
107 # connection info:
108 ip = Unicode(LOCALHOST, config=True,
108 ip = Unicode(LOCALHOST, config=True,
109 help="Set the IP or interface on which the kernel will listen.")
109 help="Set the IP or interface on which the kernel will listen.")
110 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
110 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
111 shell_port = Integer(0, config=True, help="set the shell (XREP) port [default: random]")
111 shell_port = Integer(0, config=True, help="set the shell (XREP) port [default: random]")
112 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
112 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
113 stdin_port = Integer(0, config=True, help="set the stdin (XREQ) port [default: random]")
113 stdin_port = Integer(0, config=True, help="set the stdin (XREQ) port [default: random]")
114 connection_file = Unicode('', config=True,
114 connection_file = Unicode('', config=True,
115 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
115 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
116
116
117 This file will contain the IP, ports, and authentication key needed to connect
117 This file will contain the IP, ports, and authentication key needed to connect
118 clients to this kernel. By default, this file will be created in the security-dir
118 clients to this kernel. By default, this file will be created in the security-dir
119 of the current profile, but can be specified by absolute path.
119 of the current profile, but can be specified by absolute path.
120 """)
120 """)
121
121
122 # streams, etc.
122 # streams, etc.
123 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
123 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
124 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
124 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
125 outstream_class = DottedObjectName('IPython.zmq.iostream.OutStream',
125 outstream_class = DottedObjectName('IPython.zmq.iostream.OutStream',
126 config=True, help="The importstring for the OutStream factory")
126 config=True, help="The importstring for the OutStream factory")
127 displayhook_class = DottedObjectName('IPython.zmq.displayhook.ZMQDisplayHook',
127 displayhook_class = DottedObjectName('IPython.zmq.displayhook.ZMQDisplayHook',
128 config=True, help="The importstring for the DisplayHook factory")
128 config=True, help="The importstring for the DisplayHook factory")
129
129
130 # polling
130 # polling
131 parent = Integer(0, config=True,
131 parent = Integer(0, config=True,
132 help="""kill this process if its parent dies. On Windows, the argument
132 help="""kill this process if its parent dies. On Windows, the argument
133 specifies the HANDLE of the parent process, otherwise it is simply boolean.
133 specifies the HANDLE of the parent process, otherwise it is simply boolean.
134 """)
134 """)
135 interrupt = Integer(0, config=True,
135 interrupt = Integer(0, config=True,
136 help="""ONLY USED ON WINDOWS
136 help="""ONLY USED ON WINDOWS
137 Interrupt this process when the parent is signalled.
137 Interrupt this process when the parent is signalled.
138 """)
138 """)
139
139
140 def init_crash_handler(self):
140 def init_crash_handler(self):
141 # Install minimal exception handling
141 # Install minimal exception handling
142 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
142 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
143 ostream=sys.__stdout__)
143 ostream=sys.__stdout__)
144
144
145 def init_poller(self):
145 def init_poller(self):
146 if sys.platform == 'win32':
146 if sys.platform == 'win32':
147 if self.interrupt or self.parent:
147 if self.interrupt or self.parent:
148 self.poller = ParentPollerWindows(self.interrupt, self.parent)
148 self.poller = ParentPollerWindows(self.interrupt, self.parent)
149 elif self.parent:
149 elif self.parent:
150 self.poller = ParentPollerUnix()
150 self.poller = ParentPollerUnix()
151
151
152 def _bind_socket(self, s, port):
152 def _bind_socket(self, s, port):
153 iface = 'tcp://%s' % self.ip
153 iface = 'tcp://%s' % self.ip
154 if port <= 0:
154 if port <= 0:
155 port = s.bind_to_random_port(iface)
155 port = s.bind_to_random_port(iface)
156 else:
156 else:
157 s.bind(iface + ':%i'%port)
157 s.bind(iface + ':%i'%port)
158 return port
158 return port
159
159
160 def load_connection_file(self):
160 def load_connection_file(self):
161 """load ip/port/hmac config from JSON connection file"""
161 """load ip/port/hmac config from JSON connection file"""
162 try:
162 try:
163 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
163 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
164 except IOError:
164 except IOError:
165 self.log.debug("Connection file not found: %s", self.connection_file)
165 self.log.debug("Connection file not found: %s", self.connection_file)
166 return
166 return
167 self.log.debug(u"Loading connection file %s", fname)
167 self.log.debug(u"Loading connection file %s", fname)
168 with open(fname) as f:
168 with open(fname) as f:
169 s = f.read()
169 s = f.read()
170 cfg = json.loads(s)
170 cfg = json.loads(s)
171 if self.ip == LOCALHOST and 'ip' in cfg:
171 if self.ip == LOCALHOST and 'ip' in cfg:
172 # not overridden by config or cl_args
172 # not overridden by config or cl_args
173 self.ip = cfg['ip']
173 self.ip = cfg['ip']
174 for channel in ('hb', 'shell', 'iopub', 'stdin'):
174 for channel in ('hb', 'shell', 'iopub', 'stdin'):
175 name = channel + '_port'
175 name = channel + '_port'
176 if getattr(self, name) == 0 and name in cfg:
176 if getattr(self, name) == 0 and name in cfg:
177 # not overridden by config or cl_args
177 # not overridden by config or cl_args
178 setattr(self, name, cfg[name])
178 setattr(self, name, cfg[name])
179 if 'key' in cfg:
179 if 'key' in cfg:
180 self.config.Session.key = str_to_bytes(cfg['key'])
180 self.config.Session.key = str_to_bytes(cfg['key'])
181
181
182 def write_connection_file(self):
182 def write_connection_file(self):
183 """write connection info to JSON file"""
183 """write connection info to JSON file"""
184 if os.path.basename(self.connection_file) == self.connection_file:
184 if os.path.basename(self.connection_file) == self.connection_file:
185 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
185 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
186 else:
186 else:
187 cf = self.connection_file
187 cf = self.connection_file
188 write_connection_file(cf, ip=self.ip, key=self.session.key,
188 write_connection_file(cf, ip=self.ip, key=self.session.key,
189 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
189 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
190 iopub_port=self.iopub_port)
190 iopub_port=self.iopub_port)
191
191
192 def init_connection_file(self):
192 def init_connection_file(self):
193 if not self.connection_file:
193 if not self.connection_file:
194 self.connection_file = "kernel-%s.json"%os.getpid()
194 self.connection_file = "kernel-%s.json"%os.getpid()
195 try:
195 try:
196 self.load_connection_file()
196 self.load_connection_file()
197 except Exception:
197 except Exception:
198 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
198 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
199 self.exit(1)
199 self.exit(1)
200
200
201 def init_sockets(self):
201 def init_sockets(self):
202 # Create a context, a session, and the kernel sockets.
202 # Create a context, a session, and the kernel sockets.
203 self.log.info("Starting the kernel at pid: %i", os.getpid())
203 self.log.info("Starting the kernel at pid: %i", os.getpid())
204 context = zmq.Context.instance()
204 context = zmq.Context.instance()
205 # Uncomment this to try closing the context.
205 # Uncomment this to try closing the context.
206 # atexit.register(context.term)
206 # atexit.register(context.term)
207
207
208 self.shell_socket = context.socket(zmq.ROUTER)
208 self.shell_socket = context.socket(zmq.ROUTER)
209 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
209 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
210 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
210 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
211
211
212 self.iopub_socket = context.socket(zmq.PUB)
212 self.iopub_socket = context.socket(zmq.PUB)
213 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
213 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
214 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
214 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
215
215
216 self.stdin_socket = context.socket(zmq.ROUTER)
216 self.stdin_socket = context.socket(zmq.ROUTER)
217 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
217 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
218 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
218 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
219
219
220 self.heartbeat = Heartbeat(context, (self.ip, self.hb_port))
220 # heartbeat doesn't share context, because it mustn't be blocked
221 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
222 hb_ctx = zmq.Context()
223 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
221 self.hb_port = self.heartbeat.port
224 self.hb_port = self.heartbeat.port
222 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
225 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
223
226
224 # Helper to make it easier to connect to an existing kernel.
227 # Helper to make it easier to connect to an existing kernel.
225 # set log-level to critical, to make sure it is output
228 # set log-level to critical, to make sure it is output
226 self.log.critical("To connect another client to this kernel, use:")
229 self.log.critical("To connect another client to this kernel, use:")
227
230
228 basename = os.path.basename(self.connection_file)
231 basename = os.path.basename(self.connection_file)
229 if basename == self.connection_file or \
232 if basename == self.connection_file or \
230 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
233 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
231 # use shortname
234 # use shortname
232 tail = basename
235 tail = basename
233 if self.profile != 'default':
236 if self.profile != 'default':
234 tail += " --profile %s" % self.profile
237 tail += " --profile %s" % self.profile
235 else:
238 else:
236 tail = self.connection_file
239 tail = self.connection_file
237 self.log.critical("--existing %s", tail)
240 self.log.critical("--existing %s", tail)
238
241
239
242
240 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
243 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
241 stdin=self.stdin_port, hb=self.hb_port)
244 stdin=self.stdin_port, hb=self.hb_port)
242
245
243 def init_session(self):
246 def init_session(self):
244 """create our session object"""
247 """create our session object"""
245 default_secure(self.config)
248 default_secure(self.config)
246 self.session = Session(config=self.config, username=u'kernel')
249 self.session = Session(config=self.config, username=u'kernel')
247
250
248 def init_blackhole(self):
251 def init_blackhole(self):
249 """redirects stdout/stderr to devnull if necessary"""
252 """redirects stdout/stderr to devnull if necessary"""
250 if self.no_stdout or self.no_stderr:
253 if self.no_stdout or self.no_stderr:
251 blackhole = file(os.devnull, 'w')
254 blackhole = file(os.devnull, 'w')
252 if self.no_stdout:
255 if self.no_stdout:
253 sys.stdout = sys.__stdout__ = blackhole
256 sys.stdout = sys.__stdout__ = blackhole
254 if self.no_stderr:
257 if self.no_stderr:
255 sys.stderr = sys.__stderr__ = blackhole
258 sys.stderr = sys.__stderr__ = blackhole
256
259
257 def init_io(self):
260 def init_io(self):
258 """Redirect input streams and set a display hook."""
261 """Redirect input streams and set a display hook."""
259 if self.outstream_class:
262 if self.outstream_class:
260 outstream_factory = import_item(str(self.outstream_class))
263 outstream_factory = import_item(str(self.outstream_class))
261 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
264 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
262 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
265 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
263 if self.displayhook_class:
266 if self.displayhook_class:
264 displayhook_factory = import_item(str(self.displayhook_class))
267 displayhook_factory = import_item(str(self.displayhook_class))
265 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
268 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
266
269
267 def init_kernel(self):
270 def init_kernel(self):
268 """Create the Kernel object itself"""
271 """Create the Kernel object itself"""
269 kernel_factory = import_item(str(self.kernel_class))
272 kernel_factory = import_item(str(self.kernel_class))
270 self.kernel = kernel_factory(config=self.config, session=self.session,
273 self.kernel = kernel_factory(config=self.config, session=self.session,
271 shell_socket=self.shell_socket,
274 shell_socket=self.shell_socket,
272 iopub_socket=self.iopub_socket,
275 iopub_socket=self.iopub_socket,
273 stdin_socket=self.stdin_socket,
276 stdin_socket=self.stdin_socket,
274 log=self.log
277 log=self.log
275 )
278 )
276 self.kernel.record_ports(self.ports)
279 self.kernel.record_ports(self.ports)
277
280
278 @catch_config_error
281 @catch_config_error
279 def initialize(self, argv=None):
282 def initialize(self, argv=None):
280 super(KernelApp, self).initialize(argv)
283 super(KernelApp, self).initialize(argv)
281 self.init_blackhole()
284 self.init_blackhole()
282 self.init_connection_file()
285 self.init_connection_file()
283 self.init_session()
286 self.init_session()
284 self.init_poller()
287 self.init_poller()
285 self.init_sockets()
288 self.init_sockets()
286 # writing connection file must be *after* init_sockets
289 # writing connection file must be *after* init_sockets
287 self.write_connection_file()
290 self.write_connection_file()
288 self.init_io()
291 self.init_io()
289 self.init_kernel()
292 self.init_kernel()
290 # flush stdout/stderr, so that anything written to these streams during
293 # flush stdout/stderr, so that anything written to these streams during
291 # initialization do not get associated with the first execution request
294 # initialization do not get associated with the first execution request
292 sys.stdout.flush()
295 sys.stdout.flush()
293 sys.stderr.flush()
296 sys.stderr.flush()
294
297
295 def start(self):
298 def start(self):
296 self.heartbeat.start()
299 self.heartbeat.start()
297 if self.poller is not None:
300 if self.poller is not None:
298 self.poller.start()
301 self.poller.start()
299 try:
302 try:
300 self.kernel.start()
303 self.kernel.start()
301 except KeyboardInterrupt:
304 except KeyboardInterrupt:
302 pass
305 pass
303
306
General Comments 0
You need to be logged in to leave comments. Login now