##// END OF EJS Templates
Merge pull request #1312 from minrk/hb...
Min RK -
r5954:1487f2f1 merge
parent child Browse files
Show More
@@ -1,676 +1,678 b''
1 """Tornado handlers for the notebook.
1 """Tornado handlers for the notebook.
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import logging
19 import logging
20 import Cookie
20 import Cookie
21 import time
21 import time
22 import uuid
22 import uuid
23
23
24 from tornado import web
24 from tornado import web
25 from tornado import websocket
25 from tornado import websocket
26
26
27 from zmq.eventloop import ioloop
27 from zmq.eventloop import ioloop
28 from zmq.utils import jsonapi
28 from zmq.utils import jsonapi
29
29
30 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
31 from IPython.zmq.session import Session
31 from IPython.zmq.session import Session
32 from IPython.lib.security import passwd_check
32 from IPython.lib.security import passwd_check
33
33
34 try:
34 try:
35 from docutils.core import publish_string
35 from docutils.core import publish_string
36 except ImportError:
36 except ImportError:
37 publish_string = None
37 publish_string = None
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Monkeypatch for Tornado <= 2.1.1 - Remove when no longer necessary!
40 # Monkeypatch for Tornado <= 2.1.1 - Remove when no longer necessary!
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 # Google Chrome, as of release 16, changed its websocket protocol number. The
43 # Google Chrome, as of release 16, changed its websocket protocol number. The
44 # parts tornado cares about haven't really changed, so it's OK to continue
44 # parts tornado cares about haven't really changed, so it's OK to continue
45 # accepting Chrome connections, but as of Tornado 2.1.1 (the currently released
45 # accepting Chrome connections, but as of Tornado 2.1.1 (the currently released
46 # version as of Oct 30/2011) the version check fails, see the issue report:
46 # version as of Oct 30/2011) the version check fails, see the issue report:
47
47
48 # https://github.com/facebook/tornado/issues/385
48 # https://github.com/facebook/tornado/issues/385
49
49
50 # This issue has been fixed in Tornado post 2.1.1:
50 # This issue has been fixed in Tornado post 2.1.1:
51
51
52 # https://github.com/facebook/tornado/commit/84d7b458f956727c3b0d6710
52 # https://github.com/facebook/tornado/commit/84d7b458f956727c3b0d6710
53
53
54 # Here we manually apply the same patch as above so that users of IPython can
54 # Here we manually apply the same patch as above so that users of IPython can
55 # continue to work with an officially released Tornado. We make the
55 # continue to work with an officially released Tornado. We make the
56 # monkeypatch version check as narrow as possible to limit its effects; once
56 # monkeypatch version check as narrow as possible to limit its effects; once
57 # Tornado 2.1.1 is no longer found in the wild we'll delete this code.
57 # Tornado 2.1.1 is no longer found in the wild we'll delete this code.
58
58
59 import tornado
59 import tornado
60
60
61 if tornado.version_info <= (2,1,1):
61 if tornado.version_info <= (2,1,1):
62
62
63 def _execute(self, transforms, *args, **kwargs):
63 def _execute(self, transforms, *args, **kwargs):
64 from tornado.websocket import WebSocketProtocol8, WebSocketProtocol76
64 from tornado.websocket import WebSocketProtocol8, WebSocketProtocol76
65
65
66 self.open_args = args
66 self.open_args = args
67 self.open_kwargs = kwargs
67 self.open_kwargs = kwargs
68
68
69 # The difference between version 8 and 13 is that in 8 the
69 # The difference between version 8 and 13 is that in 8 the
70 # client sends a "Sec-Websocket-Origin" header and in 13 it's
70 # client sends a "Sec-Websocket-Origin" header and in 13 it's
71 # simply "Origin".
71 # simply "Origin".
72 if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8", "13"):
72 if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8", "13"):
73 self.ws_connection = WebSocketProtocol8(self)
73 self.ws_connection = WebSocketProtocol8(self)
74 self.ws_connection.accept_connection()
74 self.ws_connection.accept_connection()
75
75
76 elif self.request.headers.get("Sec-WebSocket-Version"):
76 elif self.request.headers.get("Sec-WebSocket-Version"):
77 self.stream.write(tornado.escape.utf8(
77 self.stream.write(tornado.escape.utf8(
78 "HTTP/1.1 426 Upgrade Required\r\n"
78 "HTTP/1.1 426 Upgrade Required\r\n"
79 "Sec-WebSocket-Version: 8\r\n\r\n"))
79 "Sec-WebSocket-Version: 8\r\n\r\n"))
80 self.stream.close()
80 self.stream.close()
81
81
82 else:
82 else:
83 self.ws_connection = WebSocketProtocol76(self)
83 self.ws_connection = WebSocketProtocol76(self)
84 self.ws_connection.accept_connection()
84 self.ws_connection.accept_connection()
85
85
86 websocket.WebSocketHandler._execute = _execute
86 websocket.WebSocketHandler._execute = _execute
87 del _execute
87 del _execute
88
88
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90 # Decorator for disabling read-only handlers
90 # Decorator for disabling read-only handlers
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92
92
93 @decorator
93 @decorator
94 def not_if_readonly(f, self, *args, **kwargs):
94 def not_if_readonly(f, self, *args, **kwargs):
95 if self.application.read_only:
95 if self.application.read_only:
96 raise web.HTTPError(403, "Notebook server is read-only")
96 raise web.HTTPError(403, "Notebook server is read-only")
97 else:
97 else:
98 return f(self, *args, **kwargs)
98 return f(self, *args, **kwargs)
99
99
100 @decorator
100 @decorator
101 def authenticate_unless_readonly(f, self, *args, **kwargs):
101 def authenticate_unless_readonly(f, self, *args, **kwargs):
102 """authenticate this page *unless* readonly view is active.
102 """authenticate this page *unless* readonly view is active.
103
103
104 In read-only mode, the notebook list and print view should
104 In read-only mode, the notebook list and print view should
105 be accessible without authentication.
105 be accessible without authentication.
106 """
106 """
107
107
108 @web.authenticated
108 @web.authenticated
109 def auth_f(self, *args, **kwargs):
109 def auth_f(self, *args, **kwargs):
110 return f(self, *args, **kwargs)
110 return f(self, *args, **kwargs)
111
111
112 if self.application.read_only:
112 if self.application.read_only:
113 return f(self, *args, **kwargs)
113 return f(self, *args, **kwargs)
114 else:
114 else:
115 return auth_f(self, *args, **kwargs)
115 return auth_f(self, *args, **kwargs)
116
116
117 #-----------------------------------------------------------------------------
117 #-----------------------------------------------------------------------------
118 # Top-level handlers
118 # Top-level handlers
119 #-----------------------------------------------------------------------------
119 #-----------------------------------------------------------------------------
120
120
121 class RequestHandler(web.RequestHandler):
121 class RequestHandler(web.RequestHandler):
122 """RequestHandler with default variable setting."""
122 """RequestHandler with default variable setting."""
123
123
124 def render(*args, **kwargs):
124 def render(*args, **kwargs):
125 kwargs.setdefault('message', '')
125 kwargs.setdefault('message', '')
126 return web.RequestHandler.render(*args, **kwargs)
126 return web.RequestHandler.render(*args, **kwargs)
127
127
128 class AuthenticatedHandler(RequestHandler):
128 class AuthenticatedHandler(RequestHandler):
129 """A RequestHandler with an authenticated user."""
129 """A RequestHandler with an authenticated user."""
130
130
131 def get_current_user(self):
131 def get_current_user(self):
132 user_id = self.get_secure_cookie("username")
132 user_id = self.get_secure_cookie("username")
133 # For now the user_id should not return empty, but it could eventually
133 # For now the user_id should not return empty, but it could eventually
134 if user_id == '':
134 if user_id == '':
135 user_id = 'anonymous'
135 user_id = 'anonymous'
136 if user_id is None:
136 if user_id is None:
137 # prevent extra Invalid cookie sig warnings:
137 # prevent extra Invalid cookie sig warnings:
138 self.clear_cookie('username')
138 self.clear_cookie('username')
139 if not self.application.password and not self.application.read_only:
139 if not self.application.password and not self.application.read_only:
140 user_id = 'anonymous'
140 user_id = 'anonymous'
141 return user_id
141 return user_id
142
142
143 @property
143 @property
144 def logged_in(self):
144 def logged_in(self):
145 """Is a user currently logged in?
145 """Is a user currently logged in?
146
146
147 """
147 """
148 user = self.get_current_user()
148 user = self.get_current_user()
149 return (user and not user == 'anonymous')
149 return (user and not user == 'anonymous')
150
150
151 @property
151 @property
152 def login_available(self):
152 def login_available(self):
153 """May a user proceed to log in?
153 """May a user proceed to log in?
154
154
155 This returns True if login capability is available, irrespective of
155 This returns True if login capability is available, irrespective of
156 whether the user is already logged in or not.
156 whether the user is already logged in or not.
157
157
158 """
158 """
159 return bool(self.application.password)
159 return bool(self.application.password)
160
160
161 @property
161 @property
162 def read_only(self):
162 def read_only(self):
163 """Is the notebook read-only?
163 """Is the notebook read-only?
164
164
165 """
165 """
166 return self.application.read_only
166 return self.application.read_only
167
167
168 @property
168 @property
169 def ws_url(self):
169 def ws_url(self):
170 """websocket url matching the current request
170 """websocket url matching the current request
171
171
172 turns http[s]://host[:port] into
172 turns http[s]://host[:port] into
173 ws[s]://host[:port]
173 ws[s]://host[:port]
174 """
174 """
175 proto = self.request.protocol.replace('http', 'ws')
175 proto = self.request.protocol.replace('http', 'ws')
176 return "%s://%s" % (proto, self.request.host)
176 return "%s://%s" % (proto, self.request.host)
177
177
178
178
179 class AuthenticatedFileHandler(AuthenticatedHandler, web.StaticFileHandler):
179 class AuthenticatedFileHandler(AuthenticatedHandler, web.StaticFileHandler):
180 """static files should only be accessible when logged in"""
180 """static files should only be accessible when logged in"""
181
181
182 @authenticate_unless_readonly
182 @authenticate_unless_readonly
183 def get(self, path):
183 def get(self, path):
184 return web.StaticFileHandler.get(self, path)
184 return web.StaticFileHandler.get(self, path)
185
185
186
186
187 class ProjectDashboardHandler(AuthenticatedHandler):
187 class ProjectDashboardHandler(AuthenticatedHandler):
188
188
189 @authenticate_unless_readonly
189 @authenticate_unless_readonly
190 def get(self):
190 def get(self):
191 nbm = self.application.notebook_manager
191 nbm = self.application.notebook_manager
192 project = nbm.notebook_dir
192 project = nbm.notebook_dir
193 self.render(
193 self.render(
194 'projectdashboard.html', project=project,
194 'projectdashboard.html', project=project,
195 base_project_url=u'/', base_kernel_url=u'/',
195 base_project_url=u'/', base_kernel_url=u'/',
196 read_only=self.read_only,
196 read_only=self.read_only,
197 logged_in=self.logged_in,
197 logged_in=self.logged_in,
198 login_available=self.login_available
198 login_available=self.login_available
199 )
199 )
200
200
201
201
202 class LoginHandler(AuthenticatedHandler):
202 class LoginHandler(AuthenticatedHandler):
203
203
204 def _render(self, message=None):
204 def _render(self, message=None):
205 self.render('login.html',
205 self.render('login.html',
206 next=self.get_argument('next', default='/'),
206 next=self.get_argument('next', default='/'),
207 read_only=self.read_only,
207 read_only=self.read_only,
208 logged_in=self.logged_in,
208 logged_in=self.logged_in,
209 login_available=self.login_available,
209 login_available=self.login_available,
210 message=message
210 message=message
211 )
211 )
212
212
213 def get(self):
213 def get(self):
214 if self.current_user:
214 if self.current_user:
215 self.redirect(self.get_argument('next', default='/'))
215 self.redirect(self.get_argument('next', default='/'))
216 else:
216 else:
217 self._render()
217 self._render()
218
218
219 def post(self):
219 def post(self):
220 pwd = self.get_argument('password', default=u'')
220 pwd = self.get_argument('password', default=u'')
221 if self.application.password:
221 if self.application.password:
222 if passwd_check(self.application.password, pwd):
222 if passwd_check(self.application.password, pwd):
223 self.set_secure_cookie('username', str(uuid.uuid4()))
223 self.set_secure_cookie('username', str(uuid.uuid4()))
224 else:
224 else:
225 self._render(message={'error': 'Invalid password'})
225 self._render(message={'error': 'Invalid password'})
226 return
226 return
227
227
228 self.redirect(self.get_argument('next', default='/'))
228 self.redirect(self.get_argument('next', default='/'))
229
229
230
230
231 class LogoutHandler(AuthenticatedHandler):
231 class LogoutHandler(AuthenticatedHandler):
232
232
233 def get(self):
233 def get(self):
234 self.clear_cookie('username')
234 self.clear_cookie('username')
235 if self.login_available:
235 if self.login_available:
236 message = {'info': 'Successfully logged out.'}
236 message = {'info': 'Successfully logged out.'}
237 else:
237 else:
238 message = {'warning': 'Cannot log out. Notebook authentication '
238 message = {'warning': 'Cannot log out. Notebook authentication '
239 'is disabled.'}
239 'is disabled.'}
240
240
241 self.render('logout.html',
241 self.render('logout.html',
242 read_only=self.read_only,
242 read_only=self.read_only,
243 logged_in=self.logged_in,
243 logged_in=self.logged_in,
244 login_available=self.login_available,
244 login_available=self.login_available,
245 message=message)
245 message=message)
246
246
247
247
248 class NewHandler(AuthenticatedHandler):
248 class NewHandler(AuthenticatedHandler):
249
249
250 @web.authenticated
250 @web.authenticated
251 def get(self):
251 def get(self):
252 nbm = self.application.notebook_manager
252 nbm = self.application.notebook_manager
253 project = nbm.notebook_dir
253 project = nbm.notebook_dir
254 notebook_id = nbm.new_notebook()
254 notebook_id = nbm.new_notebook()
255 self.render(
255 self.render(
256 'notebook.html', project=project,
256 'notebook.html', project=project,
257 notebook_id=notebook_id,
257 notebook_id=notebook_id,
258 base_project_url=u'/', base_kernel_url=u'/',
258 base_project_url=u'/', base_kernel_url=u'/',
259 kill_kernel=False,
259 kill_kernel=False,
260 read_only=False,
260 read_only=False,
261 logged_in=self.logged_in,
261 logged_in=self.logged_in,
262 login_available=self.login_available,
262 login_available=self.login_available,
263 mathjax_url=self.application.ipython_app.mathjax_url,
263 mathjax_url=self.application.ipython_app.mathjax_url,
264 )
264 )
265
265
266
266
267 class NamedNotebookHandler(AuthenticatedHandler):
267 class NamedNotebookHandler(AuthenticatedHandler):
268
268
269 @authenticate_unless_readonly
269 @authenticate_unless_readonly
270 def get(self, notebook_id):
270 def get(self, notebook_id):
271 nbm = self.application.notebook_manager
271 nbm = self.application.notebook_manager
272 project = nbm.notebook_dir
272 project = nbm.notebook_dir
273 if not nbm.notebook_exists(notebook_id):
273 if not nbm.notebook_exists(notebook_id):
274 raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_id)
274 raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_id)
275
275
276 self.render(
276 self.render(
277 'notebook.html', project=project,
277 'notebook.html', project=project,
278 notebook_id=notebook_id,
278 notebook_id=notebook_id,
279 base_project_url=u'/', base_kernel_url=u'/',
279 base_project_url=u'/', base_kernel_url=u'/',
280 kill_kernel=False,
280 kill_kernel=False,
281 read_only=self.read_only,
281 read_only=self.read_only,
282 logged_in=self.logged_in,
282 logged_in=self.logged_in,
283 login_available=self.login_available,
283 login_available=self.login_available,
284 mathjax_url=self.application.ipython_app.mathjax_url,
284 mathjax_url=self.application.ipython_app.mathjax_url,
285 )
285 )
286
286
287
287
288 class PrintNotebookHandler(AuthenticatedHandler):
288 class PrintNotebookHandler(AuthenticatedHandler):
289
289
290 @authenticate_unless_readonly
290 @authenticate_unless_readonly
291 def get(self, notebook_id):
291 def get(self, notebook_id):
292 nbm = self.application.notebook_manager
292 nbm = self.application.notebook_manager
293 project = nbm.notebook_dir
293 project = nbm.notebook_dir
294 if not nbm.notebook_exists(notebook_id):
294 if not nbm.notebook_exists(notebook_id):
295 raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_id)
295 raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_id)
296
296
297 self.render(
297 self.render(
298 'printnotebook.html', project=project,
298 'printnotebook.html', project=project,
299 notebook_id=notebook_id,
299 notebook_id=notebook_id,
300 base_project_url=u'/', base_kernel_url=u'/',
300 base_project_url=u'/', base_kernel_url=u'/',
301 kill_kernel=False,
301 kill_kernel=False,
302 read_only=self.read_only,
302 read_only=self.read_only,
303 logged_in=self.logged_in,
303 logged_in=self.logged_in,
304 login_available=self.login_available,
304 login_available=self.login_available,
305 mathjax_url=self.application.ipython_app.mathjax_url,
305 mathjax_url=self.application.ipython_app.mathjax_url,
306 )
306 )
307
307
308 #-----------------------------------------------------------------------------
308 #-----------------------------------------------------------------------------
309 # Kernel handlers
309 # Kernel handlers
310 #-----------------------------------------------------------------------------
310 #-----------------------------------------------------------------------------
311
311
312
312
313 class MainKernelHandler(AuthenticatedHandler):
313 class MainKernelHandler(AuthenticatedHandler):
314
314
315 @web.authenticated
315 @web.authenticated
316 def get(self):
316 def get(self):
317 km = self.application.kernel_manager
317 km = self.application.kernel_manager
318 self.finish(jsonapi.dumps(km.kernel_ids))
318 self.finish(jsonapi.dumps(km.kernel_ids))
319
319
320 @web.authenticated
320 @web.authenticated
321 def post(self):
321 def post(self):
322 km = self.application.kernel_manager
322 km = self.application.kernel_manager
323 notebook_id = self.get_argument('notebook', default=None)
323 notebook_id = self.get_argument('notebook', default=None)
324 kernel_id = km.start_kernel(notebook_id)
324 kernel_id = km.start_kernel(notebook_id)
325 data = {'ws_url':self.ws_url,'kernel_id':kernel_id}
325 data = {'ws_url':self.ws_url,'kernel_id':kernel_id}
326 self.set_header('Location', '/'+kernel_id)
326 self.set_header('Location', '/'+kernel_id)
327 self.finish(jsonapi.dumps(data))
327 self.finish(jsonapi.dumps(data))
328
328
329
329
330 class KernelHandler(AuthenticatedHandler):
330 class KernelHandler(AuthenticatedHandler):
331
331
332 SUPPORTED_METHODS = ('DELETE')
332 SUPPORTED_METHODS = ('DELETE')
333
333
334 @web.authenticated
334 @web.authenticated
335 def delete(self, kernel_id):
335 def delete(self, kernel_id):
336 km = self.application.kernel_manager
336 km = self.application.kernel_manager
337 km.kill_kernel(kernel_id)
337 km.kill_kernel(kernel_id)
338 self.set_status(204)
338 self.set_status(204)
339 self.finish()
339 self.finish()
340
340
341
341
342 class KernelActionHandler(AuthenticatedHandler):
342 class KernelActionHandler(AuthenticatedHandler):
343
343
344 @web.authenticated
344 @web.authenticated
345 def post(self, kernel_id, action):
345 def post(self, kernel_id, action):
346 km = self.application.kernel_manager
346 km = self.application.kernel_manager
347 if action == 'interrupt':
347 if action == 'interrupt':
348 km.interrupt_kernel(kernel_id)
348 km.interrupt_kernel(kernel_id)
349 self.set_status(204)
349 self.set_status(204)
350 if action == 'restart':
350 if action == 'restart':
351 new_kernel_id = km.restart_kernel(kernel_id)
351 new_kernel_id = km.restart_kernel(kernel_id)
352 data = {'ws_url':self.ws_url,'kernel_id':new_kernel_id}
352 data = {'ws_url':self.ws_url,'kernel_id':new_kernel_id}
353 self.set_header('Location', '/'+new_kernel_id)
353 self.set_header('Location', '/'+new_kernel_id)
354 self.write(jsonapi.dumps(data))
354 self.write(jsonapi.dumps(data))
355 self.finish()
355 self.finish()
356
356
357
357
358 class ZMQStreamHandler(websocket.WebSocketHandler):
358 class ZMQStreamHandler(websocket.WebSocketHandler):
359
359
360 def _reserialize_reply(self, msg_list):
360 def _reserialize_reply(self, msg_list):
361 """Reserialize a reply message using JSON.
361 """Reserialize a reply message using JSON.
362
362
363 This takes the msg list from the ZMQ socket, unserializes it using
363 This takes the msg list from the ZMQ socket, unserializes it using
364 self.session and then serializes the result using JSON. This method
364 self.session and then serializes the result using JSON. This method
365 should be used by self._on_zmq_reply to build messages that can
365 should be used by self._on_zmq_reply to build messages that can
366 be sent back to the browser.
366 be sent back to the browser.
367 """
367 """
368 idents, msg_list = self.session.feed_identities(msg_list)
368 idents, msg_list = self.session.feed_identities(msg_list)
369 msg = self.session.unserialize(msg_list)
369 msg = self.session.unserialize(msg_list)
370 try:
370 try:
371 msg['header'].pop('date')
371 msg['header'].pop('date')
372 except KeyError:
372 except KeyError:
373 pass
373 pass
374 try:
374 try:
375 msg['parent_header'].pop('date')
375 msg['parent_header'].pop('date')
376 except KeyError:
376 except KeyError:
377 pass
377 pass
378 msg.pop('buffers')
378 msg.pop('buffers')
379 return jsonapi.dumps(msg)
379 return jsonapi.dumps(msg)
380
380
381 def _on_zmq_reply(self, msg_list):
381 def _on_zmq_reply(self, msg_list):
382 try:
382 try:
383 msg = self._reserialize_reply(msg_list)
383 msg = self._reserialize_reply(msg_list)
384 except:
384 except:
385 self.application.log.critical("Malformed message: %r" % msg_list)
385 self.application.log.critical("Malformed message: %r" % msg_list)
386 else:
386 else:
387 self.write_message(msg)
387 self.write_message(msg)
388
388
389
389
390 class AuthenticatedZMQStreamHandler(ZMQStreamHandler):
390 class AuthenticatedZMQStreamHandler(ZMQStreamHandler):
391
391
392 def open(self, kernel_id):
392 def open(self, kernel_id):
393 self.kernel_id = kernel_id.decode('ascii')
393 self.kernel_id = kernel_id.decode('ascii')
394 try:
394 try:
395 cfg = self.application.ipython_app.config
395 cfg = self.application.ipython_app.config
396 except AttributeError:
396 except AttributeError:
397 # protect from the case where this is run from something other than
397 # protect from the case where this is run from something other than
398 # the notebook app:
398 # the notebook app:
399 cfg = None
399 cfg = None
400 self.session = Session(config=cfg)
400 self.session = Session(config=cfg)
401 self.save_on_message = self.on_message
401 self.save_on_message = self.on_message
402 self.on_message = self.on_first_message
402 self.on_message = self.on_first_message
403
403
404 def get_current_user(self):
404 def get_current_user(self):
405 user_id = self.get_secure_cookie("username")
405 user_id = self.get_secure_cookie("username")
406 if user_id == '' or (user_id is None and not self.application.password):
406 if user_id == '' or (user_id is None and not self.application.password):
407 user_id = 'anonymous'
407 user_id = 'anonymous'
408 return user_id
408 return user_id
409
409
410 def _inject_cookie_message(self, msg):
410 def _inject_cookie_message(self, msg):
411 """Inject the first message, which is the document cookie,
411 """Inject the first message, which is the document cookie,
412 for authentication."""
412 for authentication."""
413 if isinstance(msg, unicode):
413 if isinstance(msg, unicode):
414 # Cookie can't constructor doesn't accept unicode strings for some reason
414 # Cookie can't constructor doesn't accept unicode strings for some reason
415 msg = msg.encode('utf8', 'replace')
415 msg = msg.encode('utf8', 'replace')
416 try:
416 try:
417 self.request._cookies = Cookie.SimpleCookie(msg)
417 self.request._cookies = Cookie.SimpleCookie(msg)
418 except:
418 except:
419 logging.warn("couldn't parse cookie string: %s",msg, exc_info=True)
419 logging.warn("couldn't parse cookie string: %s",msg, exc_info=True)
420
420
421 def on_first_message(self, msg):
421 def on_first_message(self, msg):
422 self._inject_cookie_message(msg)
422 self._inject_cookie_message(msg)
423 if self.get_current_user() is None:
423 if self.get_current_user() is None:
424 logging.warn("Couldn't authenticate WebSocket connection")
424 logging.warn("Couldn't authenticate WebSocket connection")
425 raise web.HTTPError(403)
425 raise web.HTTPError(403)
426 self.on_message = self.save_on_message
426 self.on_message = self.save_on_message
427
427
428
428
429 class IOPubHandler(AuthenticatedZMQStreamHandler):
429 class IOPubHandler(AuthenticatedZMQStreamHandler):
430
430
431 def initialize(self, *args, **kwargs):
431 def initialize(self, *args, **kwargs):
432 self._kernel_alive = True
432 self._kernel_alive = True
433 self._beating = False
433 self._beating = False
434 self.iopub_stream = None
434 self.iopub_stream = None
435 self.hb_stream = None
435 self.hb_stream = None
436
436
437 def on_first_message(self, msg):
437 def on_first_message(self, msg):
438 try:
438 try:
439 super(IOPubHandler, self).on_first_message(msg)
439 super(IOPubHandler, self).on_first_message(msg)
440 except web.HTTPError:
440 except web.HTTPError:
441 self.close()
441 self.close()
442 return
442 return
443 km = self.application.kernel_manager
443 km = self.application.kernel_manager
444 self.time_to_dead = km.time_to_dead
444 self.time_to_dead = km.time_to_dead
445 self.first_beat = km.first_beat
445 self.first_beat = km.first_beat
446 kernel_id = self.kernel_id
446 kernel_id = self.kernel_id
447 try:
447 try:
448 self.iopub_stream = km.create_iopub_stream(kernel_id)
448 self.iopub_stream = km.create_iopub_stream(kernel_id)
449 self.hb_stream = km.create_hb_stream(kernel_id)
449 self.hb_stream = km.create_hb_stream(kernel_id)
450 except web.HTTPError:
450 except web.HTTPError:
451 # WebSockets don't response to traditional error codes so we
451 # WebSockets don't response to traditional error codes so we
452 # close the connection.
452 # close the connection.
453 if not self.stream.closed():
453 if not self.stream.closed():
454 self.stream.close()
454 self.stream.close()
455 self.close()
455 self.close()
456 else:
456 else:
457 self.iopub_stream.on_recv(self._on_zmq_reply)
457 self.iopub_stream.on_recv(self._on_zmq_reply)
458 self.start_hb(self.kernel_died)
458 self.start_hb(self.kernel_died)
459
459
460 def on_message(self, msg):
460 def on_message(self, msg):
461 pass
461 pass
462
462
463 def on_close(self):
463 def on_close(self):
464 # This method can be called twice, once by self.kernel_died and once
464 # This method can be called twice, once by self.kernel_died and once
465 # from the WebSocket close event. If the WebSocket connection is
465 # from the WebSocket close event. If the WebSocket connection is
466 # closed before the ZMQ streams are setup, they could be None.
466 # closed before the ZMQ streams are setup, they could be None.
467 self.stop_hb()
467 self.stop_hb()
468 if self.iopub_stream is not None and not self.iopub_stream.closed():
468 if self.iopub_stream is not None and not self.iopub_stream.closed():
469 self.iopub_stream.on_recv(None)
469 self.iopub_stream.on_recv(None)
470 self.iopub_stream.close()
470 self.iopub_stream.close()
471 if self.hb_stream is not None and not self.hb_stream.closed():
471 if self.hb_stream is not None and not self.hb_stream.closed():
472 self.hb_stream.close()
472 self.hb_stream.close()
473
473
474 def start_hb(self, callback):
474 def start_hb(self, callback):
475 """Start the heartbeating and call the callback if the kernel dies."""
475 """Start the heartbeating and call the callback if the kernel dies."""
476 if not self._beating:
476 if not self._beating:
477 self._kernel_alive = True
477 self._kernel_alive = True
478
478
479 def ping_or_dead():
479 def ping_or_dead():
480 self.hb_stream.flush()
480 self.hb_stream.flush()
481 if self._kernel_alive:
481 if self._kernel_alive:
482 self._kernel_alive = False
482 self._kernel_alive = False
483 self.hb_stream.send(b'ping')
483 self.hb_stream.send(b'ping')
484 # flush stream to force immediate socket send
485 self.hb_stream.flush()
484 else:
486 else:
485 try:
487 try:
486 callback()
488 callback()
487 except:
489 except:
488 pass
490 pass
489 finally:
491 finally:
490 self.stop_hb()
492 self.stop_hb()
491
493
492 def beat_received(msg):
494 def beat_received(msg):
493 self._kernel_alive = True
495 self._kernel_alive = True
494
496
495 self.hb_stream.on_recv(beat_received)
497 self.hb_stream.on_recv(beat_received)
496 loop = ioloop.IOLoop.instance()
498 loop = ioloop.IOLoop.instance()
497 self._hb_periodic_callback = ioloop.PeriodicCallback(ping_or_dead, self.time_to_dead*1000, loop)
499 self._hb_periodic_callback = ioloop.PeriodicCallback(ping_or_dead, self.time_to_dead*1000, loop)
498 loop.add_timeout(time.time()+self.first_beat, self._really_start_hb)
500 loop.add_timeout(time.time()+self.first_beat, self._really_start_hb)
499 self._beating= True
501 self._beating= True
500
502
501 def _really_start_hb(self):
503 def _really_start_hb(self):
502 """callback for delayed heartbeat start
504 """callback for delayed heartbeat start
503
505
504 Only start the hb loop if we haven't been closed during the wait.
506 Only start the hb loop if we haven't been closed during the wait.
505 """
507 """
506 if self._beating and not self.hb_stream.closed():
508 if self._beating and not self.hb_stream.closed():
507 self._hb_periodic_callback.start()
509 self._hb_periodic_callback.start()
508
510
509 def stop_hb(self):
511 def stop_hb(self):
510 """Stop the heartbeating and cancel all related callbacks."""
512 """Stop the heartbeating and cancel all related callbacks."""
511 if self._beating:
513 if self._beating:
512 self._beating = False
514 self._beating = False
513 self._hb_periodic_callback.stop()
515 self._hb_periodic_callback.stop()
514 if not self.hb_stream.closed():
516 if not self.hb_stream.closed():
515 self.hb_stream.on_recv(None)
517 self.hb_stream.on_recv(None)
516
518
517 def kernel_died(self):
519 def kernel_died(self):
518 self.application.kernel_manager.delete_mapping_for_kernel(self.kernel_id)
520 self.application.kernel_manager.delete_mapping_for_kernel(self.kernel_id)
519 self.application.log.error("Kernel %s failed to respond to heartbeat", self.kernel_id)
521 self.application.log.error("Kernel %s failed to respond to heartbeat", self.kernel_id)
520 self.write_message(
522 self.write_message(
521 {'header': {'msg_type': 'status'},
523 {'header': {'msg_type': 'status'},
522 'parent_header': {},
524 'parent_header': {},
523 'content': {'execution_state':'dead'}
525 'content': {'execution_state':'dead'}
524 }
526 }
525 )
527 )
526 self.on_close()
528 self.on_close()
527
529
528
530
529 class ShellHandler(AuthenticatedZMQStreamHandler):
531 class ShellHandler(AuthenticatedZMQStreamHandler):
530
532
531 def initialize(self, *args, **kwargs):
533 def initialize(self, *args, **kwargs):
532 self.shell_stream = None
534 self.shell_stream = None
533
535
534 def on_first_message(self, msg):
536 def on_first_message(self, msg):
535 try:
537 try:
536 super(ShellHandler, self).on_first_message(msg)
538 super(ShellHandler, self).on_first_message(msg)
537 except web.HTTPError:
539 except web.HTTPError:
538 self.close()
540 self.close()
539 return
541 return
540 km = self.application.kernel_manager
542 km = self.application.kernel_manager
541 self.max_msg_size = km.max_msg_size
543 self.max_msg_size = km.max_msg_size
542 kernel_id = self.kernel_id
544 kernel_id = self.kernel_id
543 try:
545 try:
544 self.shell_stream = km.create_shell_stream(kernel_id)
546 self.shell_stream = km.create_shell_stream(kernel_id)
545 except web.HTTPError:
547 except web.HTTPError:
546 # WebSockets don't response to traditional error codes so we
548 # WebSockets don't response to traditional error codes so we
547 # close the connection.
549 # close the connection.
548 if not self.stream.closed():
550 if not self.stream.closed():
549 self.stream.close()
551 self.stream.close()
550 self.close()
552 self.close()
551 else:
553 else:
552 self.shell_stream.on_recv(self._on_zmq_reply)
554 self.shell_stream.on_recv(self._on_zmq_reply)
553
555
554 def on_message(self, msg):
556 def on_message(self, msg):
555 if len(msg) < self.max_msg_size:
557 if len(msg) < self.max_msg_size:
556 msg = jsonapi.loads(msg)
558 msg = jsonapi.loads(msg)
557 self.session.send(self.shell_stream, msg)
559 self.session.send(self.shell_stream, msg)
558
560
559 def on_close(self):
561 def on_close(self):
560 # Make sure the stream exists and is not already closed.
562 # Make sure the stream exists and is not already closed.
561 if self.shell_stream is not None and not self.shell_stream.closed():
563 if self.shell_stream is not None and not self.shell_stream.closed():
562 self.shell_stream.close()
564 self.shell_stream.close()
563
565
564
566
565 #-----------------------------------------------------------------------------
567 #-----------------------------------------------------------------------------
566 # Notebook web service handlers
568 # Notebook web service handlers
567 #-----------------------------------------------------------------------------
569 #-----------------------------------------------------------------------------
568
570
569 class NotebookRootHandler(AuthenticatedHandler):
571 class NotebookRootHandler(AuthenticatedHandler):
570
572
571 @authenticate_unless_readonly
573 @authenticate_unless_readonly
572 def get(self):
574 def get(self):
573
575
574 nbm = self.application.notebook_manager
576 nbm = self.application.notebook_manager
575 files = nbm.list_notebooks()
577 files = nbm.list_notebooks()
576 self.finish(jsonapi.dumps(files))
578 self.finish(jsonapi.dumps(files))
577
579
578 @web.authenticated
580 @web.authenticated
579 def post(self):
581 def post(self):
580 nbm = self.application.notebook_manager
582 nbm = self.application.notebook_manager
581 body = self.request.body.strip()
583 body = self.request.body.strip()
582 format = self.get_argument('format', default='json')
584 format = self.get_argument('format', default='json')
583 name = self.get_argument('name', default=None)
585 name = self.get_argument('name', default=None)
584 if body:
586 if body:
585 notebook_id = nbm.save_new_notebook(body, name=name, format=format)
587 notebook_id = nbm.save_new_notebook(body, name=name, format=format)
586 else:
588 else:
587 notebook_id = nbm.new_notebook()
589 notebook_id = nbm.new_notebook()
588 self.set_header('Location', '/'+notebook_id)
590 self.set_header('Location', '/'+notebook_id)
589 self.finish(jsonapi.dumps(notebook_id))
591 self.finish(jsonapi.dumps(notebook_id))
590
592
591
593
592 class NotebookHandler(AuthenticatedHandler):
594 class NotebookHandler(AuthenticatedHandler):
593
595
594 SUPPORTED_METHODS = ('GET', 'PUT', 'DELETE')
596 SUPPORTED_METHODS = ('GET', 'PUT', 'DELETE')
595
597
596 @authenticate_unless_readonly
598 @authenticate_unless_readonly
597 def get(self, notebook_id):
599 def get(self, notebook_id):
598 nbm = self.application.notebook_manager
600 nbm = self.application.notebook_manager
599 format = self.get_argument('format', default='json')
601 format = self.get_argument('format', default='json')
600 last_mod, name, data = nbm.get_notebook(notebook_id, format)
602 last_mod, name, data = nbm.get_notebook(notebook_id, format)
601
603
602 if format == u'json':
604 if format == u'json':
603 self.set_header('Content-Type', 'application/json')
605 self.set_header('Content-Type', 'application/json')
604 self.set_header('Content-Disposition','attachment; filename="%s.ipynb"' % name)
606 self.set_header('Content-Disposition','attachment; filename="%s.ipynb"' % name)
605 elif format == u'py':
607 elif format == u'py':
606 self.set_header('Content-Type', 'application/x-python')
608 self.set_header('Content-Type', 'application/x-python')
607 self.set_header('Content-Disposition','attachment; filename="%s.py"' % name)
609 self.set_header('Content-Disposition','attachment; filename="%s.py"' % name)
608 self.set_header('Last-Modified', last_mod)
610 self.set_header('Last-Modified', last_mod)
609 self.finish(data)
611 self.finish(data)
610
612
611 @web.authenticated
613 @web.authenticated
612 def put(self, notebook_id):
614 def put(self, notebook_id):
613 nbm = self.application.notebook_manager
615 nbm = self.application.notebook_manager
614 format = self.get_argument('format', default='json')
616 format = self.get_argument('format', default='json')
615 name = self.get_argument('name', default=None)
617 name = self.get_argument('name', default=None)
616 nbm.save_notebook(notebook_id, self.request.body, name=name, format=format)
618 nbm.save_notebook(notebook_id, self.request.body, name=name, format=format)
617 self.set_status(204)
619 self.set_status(204)
618 self.finish()
620 self.finish()
619
621
620 @web.authenticated
622 @web.authenticated
621 def delete(self, notebook_id):
623 def delete(self, notebook_id):
622 nbm = self.application.notebook_manager
624 nbm = self.application.notebook_manager
623 nbm.delete_notebook(notebook_id)
625 nbm.delete_notebook(notebook_id)
624 self.set_status(204)
626 self.set_status(204)
625 self.finish()
627 self.finish()
626
628
627
629
628 class NotebookCopyHandler(AuthenticatedHandler):
630 class NotebookCopyHandler(AuthenticatedHandler):
629
631
630 @web.authenticated
632 @web.authenticated
631 def get(self, notebook_id):
633 def get(self, notebook_id):
632 nbm = self.application.notebook_manager
634 nbm = self.application.notebook_manager
633 project = nbm.notebook_dir
635 project = nbm.notebook_dir
634 notebook_id = nbm.copy_notebook(notebook_id)
636 notebook_id = nbm.copy_notebook(notebook_id)
635 self.render(
637 self.render(
636 'notebook.html', project=project,
638 'notebook.html', project=project,
637 notebook_id=notebook_id,
639 notebook_id=notebook_id,
638 base_project_url=u'/', base_kernel_url=u'/',
640 base_project_url=u'/', base_kernel_url=u'/',
639 kill_kernel=False,
641 kill_kernel=False,
640 read_only=False,
642 read_only=False,
641 logged_in=self.logged_in,
643 logged_in=self.logged_in,
642 login_available=self.login_available,
644 login_available=self.login_available,
643 mathjax_url=self.application.ipython_app.mathjax_url,
645 mathjax_url=self.application.ipython_app.mathjax_url,
644 )
646 )
645
647
646 #-----------------------------------------------------------------------------
648 #-----------------------------------------------------------------------------
647 # RST web service handlers
649 # RST web service handlers
648 #-----------------------------------------------------------------------------
650 #-----------------------------------------------------------------------------
649
651
650
652
651 class RSTHandler(AuthenticatedHandler):
653 class RSTHandler(AuthenticatedHandler):
652
654
653 @web.authenticated
655 @web.authenticated
654 def post(self):
656 def post(self):
655 if publish_string is None:
657 if publish_string is None:
656 raise web.HTTPError(503, u'docutils not available')
658 raise web.HTTPError(503, u'docutils not available')
657 body = self.request.body.strip()
659 body = self.request.body.strip()
658 source = body
660 source = body
659 # template_path=os.path.join(os.path.dirname(__file__), u'templates', u'rst_template.html')
661 # template_path=os.path.join(os.path.dirname(__file__), u'templates', u'rst_template.html')
660 defaults = {'file_insertion_enabled': 0,
662 defaults = {'file_insertion_enabled': 0,
661 'raw_enabled': 0,
663 'raw_enabled': 0,
662 '_disable_config': 1,
664 '_disable_config': 1,
663 'stylesheet_path': 0
665 'stylesheet_path': 0
664 # 'template': template_path
666 # 'template': template_path
665 }
667 }
666 try:
668 try:
667 html = publish_string(source, writer_name='html',
669 html = publish_string(source, writer_name='html',
668 settings_overrides=defaults
670 settings_overrides=defaults
669 )
671 )
670 except:
672 except:
671 raise web.HTTPError(400, u'Invalid RST')
673 raise web.HTTPError(400, u'Invalid RST')
672 print html
674 print html
673 self.set_header('Content-Type', 'text/html')
675 self.set_header('Content-Type', 'text/html')
674 self.finish(html)
676 self.finish(html)
675
677
676
678
@@ -1,178 +1,180 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010-2011 The IPython Development Team
11 # Copyright (C) 2010-2011 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import print_function
17 from __future__ import print_function
18 import time
18 import time
19 import uuid
19 import uuid
20
20
21 import zmq
21 import zmq
22 from zmq.devices import ThreadDevice
22 from zmq.devices import ThreadDevice
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27
27
28 from IPython.parallel.util import asbytes
28 from IPython.parallel.util import asbytes
29
29
30 class Heart(object):
30 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
31 """A basic heart object for responding to a HeartMonitor.
32 This is a simple wrapper with defaults for the most common
32 This is a simple wrapper with defaults for the most common
33 Device model for responding to heartbeats.
33 Device model for responding to heartbeats.
34
34
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 SUB/XREQ for in/out.
36 SUB/XREQ for in/out.
37
37
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 device=None
39 device=None
40 id=None
40 id=None
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 # do not allow the device to share global Context.instance,
43 # do not allow the device to share global Context.instance,
44 # which is the default behavior in pyzmq > 2.1.10
44 # which is the default behavior in pyzmq > 2.1.10
45 self.device.context_factory = zmq.Context
45 self.device.context_factory = zmq.Context
46
46
47 self.device.daemon=True
47 self.device.daemon=True
48 self.device.connect_in(in_addr)
48 self.device.connect_in(in_addr)
49 self.device.connect_out(out_addr)
49 self.device.connect_out(out_addr)
50 if in_type == zmq.SUB:
50 if in_type == zmq.SUB:
51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
52 if heart_id is None:
52 if heart_id is None:
53 heart_id = uuid.uuid4().bytes
53 heart_id = uuid.uuid4().bytes
54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
55 self.id = heart_id
55 self.id = heart_id
56
56
57 def start(self):
57 def start(self):
58 return self.device.start()
58 return self.device.start()
59
59
60
60
61 class HeartMonitor(LoggingConfigurable):
61 class HeartMonitor(LoggingConfigurable):
62 """A basic HeartMonitor class
62 """A basic HeartMonitor class
63 pingstream: a PUB stream
63 pingstream: a PUB stream
64 pongstream: an XREP stream
64 pongstream: an XREP stream
65 period: the period of the heartbeat in milliseconds"""
65 period: the period of the heartbeat in milliseconds"""
66
66
67 period = Integer(1000, config=True,
67 period = Integer(3000, config=True,
68 help='The frequency at which the Hub pings the engines for heartbeats '
68 help='The frequency at which the Hub pings the engines for heartbeats '
69 '(in ms)',
69 '(in ms)',
70 )
70 )
71
71
72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
75 def _loop_default(self):
75 def _loop_default(self):
76 return ioloop.IOLoop.instance()
76 return ioloop.IOLoop.instance()
77
77
78 # not settable:
78 # not settable:
79 hearts=Set()
79 hearts=Set()
80 responses=Set()
80 responses=Set()
81 on_probation=Set()
81 on_probation=Set()
82 last_ping=CFloat(0)
82 last_ping=CFloat(0)
83 _new_handlers = Set()
83 _new_handlers = Set()
84 _failure_handlers = Set()
84 _failure_handlers = Set()
85 lifetime = CFloat(0)
85 lifetime = CFloat(0)
86 tic = CFloat(0)
86 tic = CFloat(0)
87
87
88 def __init__(self, **kwargs):
88 def __init__(self, **kwargs):
89 super(HeartMonitor, self).__init__(**kwargs)
89 super(HeartMonitor, self).__init__(**kwargs)
90
90
91 self.pongstream.on_recv(self.handle_pong)
91 self.pongstream.on_recv(self.handle_pong)
92
92
93 def start(self):
93 def start(self):
94 self.tic = time.time()
94 self.tic = time.time()
95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
96 self.caller.start()
96 self.caller.start()
97
97
98 def add_new_heart_handler(self, handler):
98 def add_new_heart_handler(self, handler):
99 """add a new handler for new hearts"""
99 """add a new handler for new hearts"""
100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
101 self._new_handlers.add(handler)
101 self._new_handlers.add(handler)
102
102
103 def add_heart_failure_handler(self, handler):
103 def add_heart_failure_handler(self, handler):
104 """add a new handler for heart failure"""
104 """add a new handler for heart failure"""
105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
106 self._failure_handlers.add(handler)
106 self._failure_handlers.add(handler)
107
107
108 def beat(self):
108 def beat(self):
109 self.pongstream.flush()
109 self.pongstream.flush()
110 self.last_ping = self.lifetime
110 self.last_ping = self.lifetime
111
111
112 toc = time.time()
112 toc = time.time()
113 self.lifetime += toc-self.tic
113 self.lifetime += toc-self.tic
114 self.tic = toc
114 self.tic = toc
115 self.log.debug("heartbeat::sending %s", self.lifetime)
115 self.log.debug("heartbeat::sending %s", self.lifetime)
116 goodhearts = self.hearts.intersection(self.responses)
116 goodhearts = self.hearts.intersection(self.responses)
117 missed_beats = self.hearts.difference(goodhearts)
117 missed_beats = self.hearts.difference(goodhearts)
118 heartfailures = self.on_probation.intersection(missed_beats)
118 heartfailures = self.on_probation.intersection(missed_beats)
119 newhearts = self.responses.difference(goodhearts)
119 newhearts = self.responses.difference(goodhearts)
120 map(self.handle_new_heart, newhearts)
120 map(self.handle_new_heart, newhearts)
121 map(self.handle_heart_failure, heartfailures)
121 map(self.handle_heart_failure, heartfailures)
122 self.on_probation = missed_beats.intersection(self.hearts)
122 self.on_probation = missed_beats.intersection(self.hearts)
123 self.responses = set()
123 self.responses = set()
124 # print self.on_probation, self.hearts
124 # print self.on_probation, self.hearts
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 self.pingstream.send(asbytes(str(self.lifetime)))
126 self.pingstream.send(asbytes(str(self.lifetime)))
127 # flush stream to force immediate socket send
128 self.pingstream.flush()
127
129
128 def handle_new_heart(self, heart):
130 def handle_new_heart(self, heart):
129 if self._new_handlers:
131 if self._new_handlers:
130 for handler in self._new_handlers:
132 for handler in self._new_handlers:
131 handler(heart)
133 handler(heart)
132 else:
134 else:
133 self.log.info("heartbeat::yay, got new heart %s!", heart)
135 self.log.info("heartbeat::yay, got new heart %s!", heart)
134 self.hearts.add(heart)
136 self.hearts.add(heart)
135
137
136 def handle_heart_failure(self, heart):
138 def handle_heart_failure(self, heart):
137 if self._failure_handlers:
139 if self._failure_handlers:
138 for handler in self._failure_handlers:
140 for handler in self._failure_handlers:
139 try:
141 try:
140 handler(heart)
142 handler(heart)
141 except Exception as e:
143 except Exception as e:
142 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
144 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
143 pass
145 pass
144 else:
146 else:
145 self.log.info("heartbeat::Heart %s failed :(", heart)
147 self.log.info("heartbeat::Heart %s failed :(", heart)
146 self.hearts.remove(heart)
148 self.hearts.remove(heart)
147
149
148
150
149 def handle_pong(self, msg):
151 def handle_pong(self, msg):
150 "a heart just beat"
152 "a heart just beat"
151 current = asbytes(str(self.lifetime))
153 current = asbytes(str(self.lifetime))
152 last = asbytes(str(self.last_ping))
154 last = asbytes(str(self.last_ping))
153 if msg[1] == current:
155 if msg[1] == current:
154 delta = time.time()-self.tic
156 delta = time.time()-self.tic
155 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
157 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
156 self.responses.add(msg[0])
158 self.responses.add(msg[0])
157 elif msg[1] == last:
159 elif msg[1] == last:
158 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
160 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
159 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
161 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
160 self.responses.add(msg[0])
162 self.responses.add(msg[0])
161 else:
163 else:
162 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
164 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
163
165
164
166
165 if __name__ == '__main__':
167 if __name__ == '__main__':
166 loop = ioloop.IOLoop.instance()
168 loop = ioloop.IOLoop.instance()
167 context = zmq.Context()
169 context = zmq.Context()
168 pub = context.socket(zmq.PUB)
170 pub = context.socket(zmq.PUB)
169 pub.bind('tcp://127.0.0.1:5555')
171 pub.bind('tcp://127.0.0.1:5555')
170 xrep = context.socket(zmq.ROUTER)
172 xrep = context.socket(zmq.ROUTER)
171 xrep.bind('tcp://127.0.0.1:5556')
173 xrep.bind('tcp://127.0.0.1:5556')
172
174
173 outstream = zmqstream.ZMQStream(pub, loop)
175 outstream = zmqstream.ZMQStream(pub, loop)
174 instream = zmqstream.ZMQStream(xrep, loop)
176 instream = zmqstream.ZMQStream(xrep, loop)
175
177
176 hb = HeartMonitor(loop, outstream, instream)
178 hb = HeartMonitor(loop, outstream, instream)
177
179
178 loop.start()
180 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now