##// END OF EJS Templates
Merge pull request #1262 from minrk/hbgil...
Min RK -
r5887:0d05c58e merge
parent child Browse files
Show More
@@ -0,0 +1,56 b''
1 {
2 "metadata": {
3 "name": "gilsleep"
4 },
5 "nbformat": 2,
6 "worksheets": [
7 {
8 "cells": [
9 {
10 "cell_type": "markdown",
11 "source": [
12 "Holding the GIL for too long could disrupt the heartbeat due to non-copying sends.",
13 "",
14 "The following cell repeatedly calls a function that holds the GIL for five seconds.",
15 "",
16 "The heartbeat will fail after a few iterations prior to fixing Issue [#1260](https://github.com/ipython/ipython/issues/1260)."
17 ]
18 },
19 {
20 "cell_type": "code",
21 "collapsed": false,
22 "input": [
23 "import sys",
24 "import time",
25 "",
26 "from cython import inline",
27 "",
28 "def gilsleep(t):",
29 " \"\"\"gil-holding sleep with cython.inline\"\"\"",
30 " code = '\\n'.join([",
31 " 'from posix cimport unistd',",
32 " 'unistd.sleep(t)',",
33 " ])",
34 " while True:",
35 " inline(code, quiet=True, t=t)",
36 " print time.time()",
37 " sys.stdout.flush() # this is important",
38 "",
39 "gilsleep(5)"
40 ],
41 "language": "python",
42 "outputs": [],
43 "prompt_number": 1
44 },
45 {
46 "cell_type": "code",
47 "collapsed": true,
48 "input": [],
49 "language": "python",
50 "outputs": [],
51 "prompt_number": " "
52 }
53 ]
54 }
55 ]
56 } No newline at end of file
@@ -0,0 +1,31 b''
1 """
2 Run this script in the qtconsole with one of:
3
4 %loadpy hb_gil.py
5
6 or
7 %run hb_gil.py
8
9 Holding the GIL for too long could disrupt the heartbeat.
10
11 See Issue #1260: https://github.com/ipython/ipython/issues/1260
12
13 """
14
15 import sys
16 import time
17
18 from cython import inline
19
20 def gilsleep(t):
21 """gil-holding sleep with cython.inline"""
22 code = '\n'.join([
23 'from posix cimport unistd',
24 'unistd.sleep(t)',
25 ])
26 while True:
27 inline(code, quiet=True, t=t)
28 print time.time()
29 sys.stdout.flush() # this is important
30
31 gilsleep(5)
@@ -1,174 +1,178 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their XREQ identities.
5 5
6 6 Authors:
7 7
8 8 * Min RK
9 9 """
10 10 #-----------------------------------------------------------------------------
11 11 # Copyright (C) 2010-2011 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-----------------------------------------------------------------------------
16 16
17 17 from __future__ import print_function
18 18 import time
19 19 import uuid
20 20
21 21 import zmq
22 22 from zmq.devices import ThreadDevice
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.config.configurable import LoggingConfigurable
26 26 from IPython.utils.traitlets import Set, Instance, CFloat
27 27
28 28 from IPython.parallel.util import asbytes
29 29
30 30 class Heart(object):
31 31 """A basic heart object for responding to a HeartMonitor.
32 32 This is a simple wrapper with defaults for the most common
33 33 Device model for responding to heartbeats.
34 34
35 35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 36 SUB/XREQ for in/out.
37 37
38 38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 39 device=None
40 40 id=None
41 41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 # do not allow the device to share global Context.instance,
44 # which is the default behavior in pyzmq > 2.1.10
45 self.device.context_factory = zmq.Context
46
43 47 self.device.daemon=True
44 48 self.device.connect_in(in_addr)
45 49 self.device.connect_out(out_addr)
46 50 if in_type == zmq.SUB:
47 51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
48 52 if heart_id is None:
49 53 heart_id = uuid.uuid4().bytes
50 54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
51 55 self.id = heart_id
52 56
53 57 def start(self):
54 58 return self.device.start()
55 59
56 60
57 61 class HeartMonitor(LoggingConfigurable):
58 62 """A basic HeartMonitor class
59 63 pingstream: a PUB stream
60 64 pongstream: an XREP stream
61 65 period: the period of the heartbeat in milliseconds"""
62 66
63 67 period=CFloat(1000, config=True,
64 68 help='The frequency at which the Hub pings the engines for heartbeats '
65 69 '(in ms)',
66 70 )
67 71
68 72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
69 73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
70 74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
71 75 def _loop_default(self):
72 76 return ioloop.IOLoop.instance()
73 77
74 78 # not settable:
75 79 hearts=Set()
76 80 responses=Set()
77 81 on_probation=Set()
78 82 last_ping=CFloat(0)
79 83 _new_handlers = Set()
80 84 _failure_handlers = Set()
81 85 lifetime = CFloat(0)
82 86 tic = CFloat(0)
83 87
84 88 def __init__(self, **kwargs):
85 89 super(HeartMonitor, self).__init__(**kwargs)
86 90
87 91 self.pongstream.on_recv(self.handle_pong)
88 92
89 93 def start(self):
90 94 self.tic = time.time()
91 95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
92 96 self.caller.start()
93 97
94 98 def add_new_heart_handler(self, handler):
95 99 """add a new handler for new hearts"""
96 100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
97 101 self._new_handlers.add(handler)
98 102
99 103 def add_heart_failure_handler(self, handler):
100 104 """add a new handler for heart failure"""
101 105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
102 106 self._failure_handlers.add(handler)
103 107
104 108 def beat(self):
105 109 self.pongstream.flush()
106 110 self.last_ping = self.lifetime
107 111
108 112 toc = time.time()
109 113 self.lifetime += toc-self.tic
110 114 self.tic = toc
111 115 self.log.debug("heartbeat::sending %s", self.lifetime)
112 116 goodhearts = self.hearts.intersection(self.responses)
113 117 missed_beats = self.hearts.difference(goodhearts)
114 118 heartfailures = self.on_probation.intersection(missed_beats)
115 119 newhearts = self.responses.difference(goodhearts)
116 120 map(self.handle_new_heart, newhearts)
117 121 map(self.handle_heart_failure, heartfailures)
118 122 self.on_probation = missed_beats.intersection(self.hearts)
119 123 self.responses = set()
120 124 # print self.on_probation, self.hearts
121 125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
122 126 self.pingstream.send(asbytes(str(self.lifetime)))
123 127
124 128 def handle_new_heart(self, heart):
125 129 if self._new_handlers:
126 130 for handler in self._new_handlers:
127 131 handler(heart)
128 132 else:
129 133 self.log.info("heartbeat::yay, got new heart %s!", heart)
130 134 self.hearts.add(heart)
131 135
132 136 def handle_heart_failure(self, heart):
133 137 if self._failure_handlers:
134 138 for handler in self._failure_handlers:
135 139 try:
136 140 handler(heart)
137 141 except Exception as e:
138 142 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
139 143 pass
140 144 else:
141 145 self.log.info("heartbeat::Heart %s failed :(", heart)
142 146 self.hearts.remove(heart)
143 147
144 148
145 149 def handle_pong(self, msg):
146 150 "a heart just beat"
147 151 current = asbytes(str(self.lifetime))
148 152 last = asbytes(str(self.last_ping))
149 153 if msg[1] == current:
150 154 delta = time.time()-self.tic
151 155 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
152 156 self.responses.add(msg[0])
153 157 elif msg[1] == last:
154 158 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
155 159 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
156 160 self.responses.add(msg[0])
157 161 else:
158 162 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
159 163
160 164
161 165 if __name__ == '__main__':
162 166 loop = ioloop.IOLoop.instance()
163 167 context = zmq.Context()
164 168 pub = context.socket(zmq.PUB)
165 169 pub.bind('tcp://127.0.0.1:5555')
166 170 xrep = context.socket(zmq.ROUTER)
167 171 xrep.bind('tcp://127.0.0.1:5556')
168 172
169 173 outstream = zmqstream.ZMQStream(pub, loop)
170 174 instream = zmqstream.ZMQStream(xrep, loop)
171 175
172 176 hb = HeartMonitor(loop, outstream, instream)
173 177
174 178 loop.start()
@@ -1,303 +1,306 b''
1 1 """An Application for launching a kernel
2 2
3 3 Authors
4 4 -------
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING.txt, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import json
20 20 import os
21 21 import sys
22 22
23 23 # System library imports.
24 24 import zmq
25 25
26 26 # IPython imports.
27 27 from IPython.core.ultratb import FormattedTB
28 28 from IPython.core.application import (
29 29 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
30 30 )
31 31 from IPython.utils import io
32 32 from IPython.utils.localinterfaces import LOCALHOST
33 33 from IPython.utils.path import filefind
34 34 from IPython.utils.py3compat import str_to_bytes
35 35 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
36 36 DottedObjectName)
37 37 from IPython.utils.importstring import import_item
38 38 # local imports
39 39 from IPython.zmq.entry_point import write_connection_file
40 40 from IPython.zmq.heartbeat import Heartbeat
41 41 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
42 42 from IPython.zmq.session import (
43 43 Session, session_flags, session_aliases, default_secure,
44 44 )
45 45
46 46
47 47 #-----------------------------------------------------------------------------
48 48 # Flags and Aliases
49 49 #-----------------------------------------------------------------------------
50 50
51 51 kernel_aliases = dict(base_aliases)
52 52 kernel_aliases.update({
53 53 'ip' : 'KernelApp.ip',
54 54 'hb' : 'KernelApp.hb_port',
55 55 'shell' : 'KernelApp.shell_port',
56 56 'iopub' : 'KernelApp.iopub_port',
57 57 'stdin' : 'KernelApp.stdin_port',
58 58 'f' : 'KernelApp.connection_file',
59 59 'parent': 'KernelApp.parent',
60 60 })
61 61 if sys.platform.startswith('win'):
62 62 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
63 63
64 64 kernel_flags = dict(base_flags)
65 65 kernel_flags.update({
66 66 'no-stdout' : (
67 67 {'KernelApp' : {'no_stdout' : True}},
68 68 "redirect stdout to the null device"),
69 69 'no-stderr' : (
70 70 {'KernelApp' : {'no_stderr' : True}},
71 71 "redirect stderr to the null device"),
72 72 })
73 73
74 74 # inherit flags&aliases for Sessions
75 75 kernel_aliases.update(session_aliases)
76 76 kernel_flags.update(session_flags)
77 77
78 78
79 79
80 80 #-----------------------------------------------------------------------------
81 81 # Application class for starting a Kernel
82 82 #-----------------------------------------------------------------------------
83 83
84 84 class KernelApp(BaseIPythonApplication):
85 85 name='pykernel'
86 86 aliases = Dict(kernel_aliases)
87 87 flags = Dict(kernel_flags)
88 88 classes = [Session]
89 89 # the kernel class, as an importstring
90 90 kernel_class = DottedObjectName('IPython.zmq.pykernel.Kernel')
91 91 kernel = Any()
92 92 poller = Any() # don't restrict this even though current pollers are all Threads
93 93 heartbeat = Instance(Heartbeat)
94 94 session = Instance('IPython.zmq.session.Session')
95 95 ports = Dict()
96 96
97 97 # inherit config file name from parent:
98 98 parent_appname = Unicode(config=True)
99 99 def _parent_appname_changed(self, name, old, new):
100 100 if self.config_file_specified:
101 101 # it was manually specified, ignore
102 102 return
103 103 self.config_file_name = new.replace('-','_') + u'_config.py'
104 104 # don't let this count as specifying the config file
105 105 self.config_file_specified = False
106 106
107 107 # connection info:
108 108 ip = Unicode(LOCALHOST, config=True,
109 109 help="Set the IP or interface on which the kernel will listen.")
110 110 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
111 111 shell_port = Integer(0, config=True, help="set the shell (XREP) port [default: random]")
112 112 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
113 113 stdin_port = Integer(0, config=True, help="set the stdin (XREQ) port [default: random]")
114 114 connection_file = Unicode('', config=True,
115 115 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
116 116
117 117 This file will contain the IP, ports, and authentication key needed to connect
118 118 clients to this kernel. By default, this file will be created in the security-dir
119 119 of the current profile, but can be specified by absolute path.
120 120 """)
121 121
122 122 # streams, etc.
123 123 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
124 124 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
125 125 outstream_class = DottedObjectName('IPython.zmq.iostream.OutStream',
126 126 config=True, help="The importstring for the OutStream factory")
127 127 displayhook_class = DottedObjectName('IPython.zmq.displayhook.ZMQDisplayHook',
128 128 config=True, help="The importstring for the DisplayHook factory")
129 129
130 130 # polling
131 131 parent = Integer(0, config=True,
132 132 help="""kill this process if its parent dies. On Windows, the argument
133 133 specifies the HANDLE of the parent process, otherwise it is simply boolean.
134 134 """)
135 135 interrupt = Integer(0, config=True,
136 136 help="""ONLY USED ON WINDOWS
137 137 Interrupt this process when the parent is signalled.
138 138 """)
139 139
140 140 def init_crash_handler(self):
141 141 # Install minimal exception handling
142 142 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
143 143 ostream=sys.__stdout__)
144 144
145 145 def init_poller(self):
146 146 if sys.platform == 'win32':
147 147 if self.interrupt or self.parent:
148 148 self.poller = ParentPollerWindows(self.interrupt, self.parent)
149 149 elif self.parent:
150 150 self.poller = ParentPollerUnix()
151 151
152 152 def _bind_socket(self, s, port):
153 153 iface = 'tcp://%s' % self.ip
154 154 if port <= 0:
155 155 port = s.bind_to_random_port(iface)
156 156 else:
157 157 s.bind(iface + ':%i'%port)
158 158 return port
159 159
160 160 def load_connection_file(self):
161 161 """load ip/port/hmac config from JSON connection file"""
162 162 try:
163 163 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
164 164 except IOError:
165 165 self.log.debug("Connection file not found: %s", self.connection_file)
166 166 return
167 167 self.log.debug(u"Loading connection file %s", fname)
168 168 with open(fname) as f:
169 169 s = f.read()
170 170 cfg = json.loads(s)
171 171 if self.ip == LOCALHOST and 'ip' in cfg:
172 172 # not overridden by config or cl_args
173 173 self.ip = cfg['ip']
174 174 for channel in ('hb', 'shell', 'iopub', 'stdin'):
175 175 name = channel + '_port'
176 176 if getattr(self, name) == 0 and name in cfg:
177 177 # not overridden by config or cl_args
178 178 setattr(self, name, cfg[name])
179 179 if 'key' in cfg:
180 180 self.config.Session.key = str_to_bytes(cfg['key'])
181 181
182 182 def write_connection_file(self):
183 183 """write connection info to JSON file"""
184 184 if os.path.basename(self.connection_file) == self.connection_file:
185 185 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
186 186 else:
187 187 cf = self.connection_file
188 188 write_connection_file(cf, ip=self.ip, key=self.session.key,
189 189 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
190 190 iopub_port=self.iopub_port)
191 191
192 192 def init_connection_file(self):
193 193 if not self.connection_file:
194 194 self.connection_file = "kernel-%s.json"%os.getpid()
195 195 try:
196 196 self.load_connection_file()
197 197 except Exception:
198 198 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
199 199 self.exit(1)
200 200
201 201 def init_sockets(self):
202 202 # Create a context, a session, and the kernel sockets.
203 203 self.log.info("Starting the kernel at pid: %i", os.getpid())
204 204 context = zmq.Context.instance()
205 205 # Uncomment this to try closing the context.
206 206 # atexit.register(context.term)
207 207
208 208 self.shell_socket = context.socket(zmq.ROUTER)
209 209 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
210 210 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
211 211
212 212 self.iopub_socket = context.socket(zmq.PUB)
213 213 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
214 214 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
215 215
216 216 self.stdin_socket = context.socket(zmq.ROUTER)
217 217 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
218 218 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
219
220 self.heartbeat = Heartbeat(context, (self.ip, self.hb_port))
219
220 # heartbeat doesn't share context, because it mustn't be blocked
221 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
222 hb_ctx = zmq.Context()
223 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
221 224 self.hb_port = self.heartbeat.port
222 225 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
223 226
224 227 # Helper to make it easier to connect to an existing kernel.
225 228 # set log-level to critical, to make sure it is output
226 229 self.log.critical("To connect another client to this kernel, use:")
227 230
228 231 basename = os.path.basename(self.connection_file)
229 232 if basename == self.connection_file or \
230 233 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
231 234 # use shortname
232 235 tail = basename
233 236 if self.profile != 'default':
234 237 tail += " --profile %s" % self.profile
235 238 else:
236 239 tail = self.connection_file
237 240 self.log.critical("--existing %s", tail)
238 241
239 242
240 243 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
241 244 stdin=self.stdin_port, hb=self.hb_port)
242 245
243 246 def init_session(self):
244 247 """create our session object"""
245 248 default_secure(self.config)
246 249 self.session = Session(config=self.config, username=u'kernel')
247 250
248 251 def init_blackhole(self):
249 252 """redirects stdout/stderr to devnull if necessary"""
250 253 if self.no_stdout or self.no_stderr:
251 254 blackhole = file(os.devnull, 'w')
252 255 if self.no_stdout:
253 256 sys.stdout = sys.__stdout__ = blackhole
254 257 if self.no_stderr:
255 258 sys.stderr = sys.__stderr__ = blackhole
256 259
257 260 def init_io(self):
258 261 """Redirect input streams and set a display hook."""
259 262 if self.outstream_class:
260 263 outstream_factory = import_item(str(self.outstream_class))
261 264 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
262 265 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
263 266 if self.displayhook_class:
264 267 displayhook_factory = import_item(str(self.displayhook_class))
265 268 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
266 269
267 270 def init_kernel(self):
268 271 """Create the Kernel object itself"""
269 272 kernel_factory = import_item(str(self.kernel_class))
270 273 self.kernel = kernel_factory(config=self.config, session=self.session,
271 274 shell_socket=self.shell_socket,
272 275 iopub_socket=self.iopub_socket,
273 276 stdin_socket=self.stdin_socket,
274 277 log=self.log
275 278 )
276 279 self.kernel.record_ports(self.ports)
277 280
278 281 @catch_config_error
279 282 def initialize(self, argv=None):
280 283 super(KernelApp, self).initialize(argv)
281 284 self.init_blackhole()
282 285 self.init_connection_file()
283 286 self.init_session()
284 287 self.init_poller()
285 288 self.init_sockets()
286 289 # writing connection file must be *after* init_sockets
287 290 self.write_connection_file()
288 291 self.init_io()
289 292 self.init_kernel()
290 293 # flush stdout/stderr, so that anything written to these streams during
291 294 # initialization do not get associated with the first execution request
292 295 sys.stdout.flush()
293 296 sys.stderr.flush()
294 297
295 298 def start(self):
296 299 self.heartbeat.start()
297 300 if self.poller is not None:
298 301 self.poller.start()
299 302 try:
300 303 self.kernel.start()
301 304 except KeyboardInterrupt:
302 305 pass
303 306
General Comments 0
You need to be logged in to leave comments. Login now