##// END OF EJS Templates
Work to adapt routers to new Session message protocol.
Brian E. Granger -
Show More
@@ -1,163 +1,116 b''
1 """Tornado handlers for the notebook."""
2
3 #-----------------------------------------------------------------------------
4 # Imports
5 #-----------------------------------------------------------------------------
6
1 7 import datetime
2 8 import json
3 9 import logging
4 10 import os
5 11 import urllib
6 import uuid
7 from Queue import Queue
8 12
9 13 from tornado import web
10 14 from tornado import websocket
11 15
16 #-----------------------------------------------------------------------------
17 # Handlers
18 #-----------------------------------------------------------------------------
12 19
13 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
14 _kernel_action_regex = r"(?P<action>restart|interrupt)"
15 20
16 21 class MainHandler(web.RequestHandler):
17 22 def get(self):
18 23 self.render('notebook.html')
19 24
20 25
21 26 class KernelHandler(web.RequestHandler):
22 27
23 28 def get(self):
24 29 self.write(json.dumps(self.application.kernel_ids))
25 30
26 31 def post(self):
27 32 kernel_id = self.application.start_kernel()
28 33 self.write(json.dumps(kernel_id))
29 34
30 35
31 36 class KernelActionHandler(web.RequestHandler):
32 37
33 38 def post(self, kernel_id, action):
34 39 # TODO: figure out a better way of handling RPC style calls.
35 40 if action == 'interrupt':
36 41 self.application.interrupt_kernel(kernel_id)
37 42 if action == 'restart':
38 43 new_kernel_id = self.application.restart_kernel(kernel_id)
39 44 self.write(json.dumps(new_kernel_id))
40 45
41 46
42 class ZMQStreamRouter(object):
43
44 def __init__(self, zmq_stream):
45 self.zmq_stream = zmq_stream
46 self._clients = {}
47 self.zmq_stream.on_recv(self._on_zmq_reply)
48
49 def register_client(self, client):
50 client_id = uuid.uuid4()
51 self._clients[client_id] = client
52 return client_id
53
54 def unregister_client(self, client_id):
55 del self._clients[client_id]
56
57 def copy_clients(self, router):
58 # Copy the clients of another router.
59 for client_id, client in router._clients.items():
60 client.router = self
61 self._clients[client_id] = client
62
63
64 class IOPubStreamRouter(ZMQStreamRouter):
65
66 def _on_zmq_reply(self, msg_list):
67 for client_id, client in self._clients.items():
68 for msg in msg_list:
69 client.write_message(msg)
70
71 def forward_unicode(self, client_id, msg):
72 # This is a SUB stream that we should never write to.
73 pass
74
75
76 class ShellStreamRouter(ZMQStreamRouter):
77
78 def __init__(self, zmq_stream):
79 ZMQStreamRouter.__init__(self, zmq_stream)
80 self._request_queue = Queue()
81
82 def _on_zmq_reply(self, msg_list):
83 client_id = self._request_queue.get(block=False)
84 client = self._clients.get(client_id)
85 if client is not None:
86 for msg in msg_list:
87 client.write_message(msg)
88
89 def forward_unicode(self, client_id, msg):
90 self._request_queue.put(client_id)
91 self.zmq_stream.send_unicode(msg)
92
93
94 47 class ZMQStreamHandler(websocket.WebSocketHandler):
95 48
96 49 def initialize(self, stream_name):
97 50 self.stream_name = stream_name
98 51
99 52 def open(self, kernel_id):
100 53 self.router = self.application.get_router(kernel_id, self.stream_name)
101 54 self.client_id = self.router.register_client(self)
102 55 logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
103 56
104 57 def on_message(self, msg):
105 58 self.router.forward_unicode(self.client_id, msg)
106 59
107 60 def on_close(self):
108 61 self.router.unregister_client(self.client_id)
109 62 logging.info("Connection closed: %s" % self.client_id)
110 63
111 64
112 65 class NotebookRootHandler(web.RequestHandler):
113 66
114 67 def get(self):
115 68 files = os.listdir(os.getcwd())
116 69 files = [file for file in files if file.endswith(".ipynb")]
117 70 self.write(json.dumps(files))
118 71
119 72
120 73 class NotebookHandler(web.RequestHandler):
121 74
122 75 SUPPORTED_METHODS = ("GET", "DELETE", "PUT")
123 76
124 77 def find_path(self, filename):
125 78 filename = urllib.unquote(filename)
126 79 if not filename.endswith('.ipynb'):
127 80 raise web.HTTPError(400)
128 81 path = os.path.join(os.getcwd(), filename)
129 82 return path
130 83
131 84 def get(self, filename):
132 85 path = self.find_path(filename)
133 86 if not os.path.isfile(path):
134 87 raise web.HTTPError(404)
135 88 info = os.stat(path)
136 89 self.set_header("Content-Type", "application/unknown")
137 90 self.set_header("Last-Modified", datetime.datetime.utcfromtimestamp(
138 91 info.st_mtime))
139 92 f = open(path, "r")
140 93 try:
141 94 self.finish(f.read())
142 95 finally:
143 96 f.close()
144 97
145 98 def put(self, filename):
146 99 path = self.find_path(filename)
147 100 f = open(path, "w")
148 101 f.write(self.request.body)
149 102 f.close()
150 103 self.finish()
151 104
152 105 def delete(self, filename):
153 106 path = self.find_path(filename)
154 107 if not os.path.isfile(path):
155 108 raise web.HTTPError(404)
156 109 os.unlink(path)
157 110 self.set_status(204)
158 111 self.finish()
159 112
160 113
161 114
162 115
163 116
@@ -1,182 +1,181 b''
1 1 """A kernel manager for multiple kernels."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Imports
5 5 #-----------------------------------------------------------------------------
6 6
7 import logging
8 7 import signal
9 8 import sys
10 9 import uuid
11 10
12 11 import zmq
13 12
14 13 from IPython.config.configurable import LoggingConfigurable
15 14 from IPython.zmq.ipkernel import launch_kernel
16 from IPython.utils.traitlets import Instance, Dict, Unicode
15 from IPython.utils.traitlets import Instance, Dict
17 16
18 17 #-----------------------------------------------------------------------------
19 18 # Classes
20 19 #-----------------------------------------------------------------------------
21 20
22 21 class DuplicateKernelError(Exception):
23 22 pass
24 23
25 24
26 25 class KernelManager(LoggingConfigurable):
27 26 """A class for managing multiple kernels."""
28 27
29 28 context = Instance('zmq.Context')
30 29 def _context_default(self):
31 30 return zmq.Context.instance()
32 31
33 32 _kernels = Dict()
34 33
35 34 @property
36 35 def kernel_ids(self):
37 36 """Return a list of the kernel ids of the active kernels."""
38 37 return self._kernels.keys()
39 38
40 39 def __len__(self):
41 40 """Return the number of running kernels."""
42 41 return len(self.kernel_ids)
43 42
44 43 def __contains__(self, kernel_id):
45 44 if kernel_id in self.kernel_ids:
46 45 return True
47 46 else:
48 47 return False
49 48
50 49 def start_kernel(self, **kwargs):
51 50 """Start a new kernel."""
52 51 kernel_id = str(uuid.uuid4())
53 52 (process, shell_port, iopub_port, stdin_port, hb_port) = launch_kernel(**kwargs)
54 53 # Store the information for contacting the kernel. This assumes the kernel is
55 54 # running on localhost.
56 55 d = dict(
57 56 process = process,
58 57 stdin_port = stdin_port,
59 58 iopub_port = iopub_port,
60 59 shell_port = shell_port,
61 60 hb_port = hb_port,
62 61 ip = '127.0.0.1'
63 62 )
64 63 self._kernels[kernel_id] = d
65 64 return kernel_id
66 65
67 66 def kill_kernel(self, kernel_id):
68 67 """Kill a kernel by its kernel uuid.
69 68
70 69 Parameters
71 70 ==========
72 71 kernel_id : uuid
73 72 The id of the kernel to kill.
74 73 """
75 74 kernel_process = self.get_kernel_process(kernel_id)
76 75 if kernel_process is not None:
77 76 # Attempt to kill the kernel.
78 77 try:
79 78 kernel_process.kill()
80 79 except OSError, e:
81 80 # In Windows, we will get an Access Denied error if the process
82 81 # has already terminated. Ignore it.
83 82 if not (sys.platform == 'win32' and e.winerror == 5):
84 83 raise
85 84 del self._kernels[kernel_id]
86 85
87 86 def interrupt_kernel(self, kernel_id):
88 87 """Interrupt (SIGINT) the kernel by its uuid.
89 88
90 89 Parameters
91 90 ==========
92 91 kernel_id : uuid
93 92 The id of the kernel to interrupt.
94 93 """
95 94 kernel_process = self.get_kernel_process(kernel_id)
96 95 if kernel_process is not None:
97 96 if sys.platform == 'win32':
98 97 from parentpoller import ParentPollerWindows as Poller
99 98 Poller.send_interrupt(kernel_process.win32_interrupt_event)
100 99 else:
101 100 kernel_process.send_signal(signal.SIGINT)
102 101
103 102 def signal_kernel(self, kernel_id, signum):
104 103 """ Sends a signal to the kernel by its uuid.
105 104
106 105 Note that since only SIGTERM is supported on Windows, this function
107 106 is only useful on Unix systems.
108 107
109 108 Parameters
110 109 ==========
111 110 kernel_id : uuid
112 111 The id of the kernel to signal.
113 112 """
114 113 kernel_process = self.get_kernel_process(kernel_id)
115 114 if kernel_process is not None:
116 115 kernel_process.send_signal(signum)
117 116
118 117 def get_kernel_process(self, kernel_id):
119 118 """Get the process object for a kernel by its uuid.
120 119
121 120 Parameters
122 121 ==========
123 122 kernel_id : uuid
124 123 The id of the kernel.
125 124 """
126 125 d = self._kernels.get(kernel_id)
127 126 if d is not None:
128 127 return d['process']
129 128 else:
130 129 raise KeyError("Kernel with id not found: %s" % kernel_id)
131 130
132 131 def get_kernel_ports(self, kernel_id):
133 132 """Return a dictionary of ports for a kernel.
134 133
135 134 Parameters
136 135 ==========
137 136 kernel_id : uuid
138 137 The id of the kernel.
139 138
140 139 Returns
141 140 =======
142 141 port_dict : dict
143 142 A dict of key, value pairs where the keys are the names
144 143 (stdin_port,iopub_port,shell_port) and the values are the
145 144 integer port numbers for those channels.
146 145 """
147 146 d = self._kernels.get(kernel_id)
148 147 if d is not None:
149 148 dcopy = d.copy()
150 149 dcopy.pop('process')
151 150 dcopy.pop('ip')
152 151 return dcopy
153 152 else:
154 153 raise KeyError("Kernel with id not found: %s" % kernel_id)
155 154
156 155 def get_kernel_ip(self, kernel_id):
157 156 """Return ip address for a kernel.
158 157
159 158 Parameters
160 159 ==========
161 160 kernel_id : uuid
162 161 The id of the kernel.
163 162
164 163 Returns
165 164 =======
166 165 ip : str
167 166 The ip address of the kernel.
168 167 """
169 168 d = self._kernels.get(kernel_id)
170 169 if d is not None:
171 170 return d['ip']
172 171 else:
173 172 raise KeyError("Kernel with id not found: %s" % kernel_id)
174 173
175 174 def create_session_manager(self, kernel_id):
176 175 """Create a new session manager for a kernel by its uuid."""
177 176 from sessionmanager import SessionManager
178 177 return SessionManager(
179 178 kernel_id=kernel_id, kernel_manager=self,
180 179 config=self.config, context=self.context, log=self.log
181 180 )
182 181
@@ -1,247 +1,249 b''
1 1 """A tornado based IPython notebook server."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Imports
5 5 #-----------------------------------------------------------------------------
6 6
7 7 import logging
8 8 import os
9 9 import signal
10 10 import sys
11 11
12 12 import zmq
13 13
14 14 # Install the pyzmq ioloop. This has to be done before anything else from
15 15 # tornado is imported.
16 16 from zmq.eventloop import ioloop
17 17 import tornado.ioloop
18 18 tornado.ioloop = ioloop
19 19
20 20 from tornado import httpserver
21 21 from tornado import web
22 22
23 23 from kernelmanager import KernelManager
24 24 from sessionmanager import SessionManager
25 25 from handlers import (
26 26 MainHandler, KernelHandler, KernelActionHandler, ZMQStreamHandler,
27 27 NotebookRootHandler, NotebookHandler
28 28 )
29 29 from routers import IOPubStreamRouter, ShellStreamRouter
30 30
31 31 from IPython.core.application import BaseIPythonApplication
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.frontend.qt.console.rich_ipython_widget import RichIPythonWidget
34 34 from IPython.zmq.session import Session
35 35 from IPython.zmq.zmqshell import ZMQInteractiveShell
36 36 from IPython.zmq.ipkernel import (
37 37 flags as ipkernel_flags,
38 38 aliases as ipkernel_aliases,
39 39 IPKernelApp
40 40 )
41 41 from IPython.utils.traitlets import Dict, Unicode, Int, Any, List, Enum
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Module globals
45 45 #-----------------------------------------------------------------------------
46 46
47 47 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
48 48 _kernel_action_regex = r"(?P<action>restart|interrupt)"
49 49
50 50 LOCALHOST = '127.0.0.1'
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # The Tornado web application
54 54 #-----------------------------------------------------------------------------
55 55
56 56 class NotebookWebApplication(web.Application):
57 57
58 58 def __init__(self, kernel_manager, log, kernel_argv):
59 59 handlers = [
60 60 (r"/", MainHandler),
61 61 (r"/kernels", KernelHandler),
62 62 (r"/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
63 63 (r"/kernels/%s/iopub" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='iopub')),
64 64 (r"/kernels/%s/shell" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='shell')),
65 65 (r"/notebooks", NotebookRootHandler),
66 66 (r"/notebooks/([^/]+)", NotebookHandler)
67 67 ]
68 68 settings = dict(
69 69 template_path=os.path.join(os.path.dirname(__file__), "templates"),
70 70 static_path=os.path.join(os.path.dirname(__file__), "static"),
71 71 )
72 72 web.Application.__init__(self, handlers, **settings)
73 73
74 74 self.kernel_manager = kernel_manager
75 75 self.log = log
76 76 self.kernel_argv = kernel_argv
77 77 self._routers = {}
78 78 self._session_dict = {}
79 79
80 80 #-------------------------------------------------------------------------
81 81 # Methods for managing kernels and sessions
82 82 #-------------------------------------------------------------------------
83 83
84 84 @property
85 85 def kernel_ids(self):
86 86 return self.kernel_manager.kernel_ids
87 87
88 88 def start_kernel(self):
89 89 kwargs = dict()
90 90 kwargs['extra_arguments'] = self.kernel_argv
91 91 kernel_id = self.kernel_manager.start_kernel(**kwargs)
92 92 self.log.info("Kernel started: %s" % kernel_id)
93 93 self.log.debug("Kernel args: %r" % kwargs)
94 94 self.start_session_manager(kernel_id)
95 95 return kernel_id
96 96
97 97 def start_session_manager(self, kernel_id):
98 98 sm = self.kernel_manager.create_session_manager(kernel_id)
99 99 self._session_dict[kernel_id] = sm
100 100 iopub_stream = sm.get_iopub_stream()
101 101 shell_stream = sm.get_shell_stream()
102 iopub_router = IOPubStreamRouter(iopub_stream)
103 shell_router = ShellStreamRouter(shell_stream)
102 iopub_router = IOPubStreamRouter(iopub_stream, sm.session)
103 shell_router = ShellStreamRouter(shell_stream, sm.session)
104 104 self._routers[(kernel_id, 'iopub')] = iopub_router
105 105 self._routers[(kernel_id, 'shell')] = shell_router
106 106
107 107 def kill_kernel(self, kernel_id):
108 108 sm = self._session_dict.pop(kernel_id)
109 109 sm.stop()
110 110 self.kernel_manager.kill_kernel(kernel_id)
111 111 self.log.info("Kernel killed: %s" % kernel_id)
112 112
113 113 def interrupt_kernel(self, kernel_id):
114 114 self.kernel_manager.interrupt_kernel(kernel_id)
115 115 self.log.debug("Kernel interrupted: %s" % kernel_id)
116 116
117 117 def restart_kernel(self, kernel_id):
118 118 # Create the new kernel first so we can move the clients over.
119 119 new_kernel_id = self.start_kernel()
120 120
121 121 # Copy the clients over to the new routers.
122 122 old_iopub_router = self.get_router(kernel_id, 'iopub')
123 123 old_shell_router = self.get_router(kernel_id, 'shell')
124 124 new_iopub_router = self.get_router(new_kernel_id, 'iopub')
125 125 new_shell_router = self.get_router(new_kernel_id, 'shell')
126 126 new_iopub_router.copy_clients(old_iopub_router)
127 127 new_shell_router.copy_clients(old_shell_router)
128 128
129 129 # Now shutdown the old session and the kernel.
130 130 # TODO: This causes a hard crash in ZMQStream.close, which sets
131 131 # self.socket to None to hastily. We will need to fix this in PyZMQ
132 132 # itself. For now, we just leave the old kernel running :(
133 133 # self.kill_kernel(kernel_id)
134 134
135 135 self.log.debug("Kernel restarted: %s -> %s" % (kernel_id, new_kernel_id))
136 136 return new_kernel_id
137 137
138 138 def get_router(self, kernel_id, stream_name):
139 139 router = self._routers[(kernel_id, stream_name)]
140 140 return router
141 141
142
143
142 144 #-----------------------------------------------------------------------------
143 145 # Aliases and Flags
144 146 #-----------------------------------------------------------------------------
145 147
146 148 flags = dict(ipkernel_flags)
147 149
148 150 # the flags that are specific to the frontend
149 151 # these must be scrubbed before being passed to the kernel,
150 152 # or it will raise an error on unrecognized flags
151 153 notebook_flags = []
152 154
153 155 aliases = dict(ipkernel_aliases)
154 156
155 157 aliases.update(dict(
156 158 ip = 'IPythonNotebookApp.ip',
157 159 port = 'IPythonNotebookApp.port',
158 160 colors = 'ZMQInteractiveShell.colors',
159 161 editor = 'RichIPythonWidget.editor',
160 162 ))
161 163
162 164 #-----------------------------------------------------------------------------
163 165 # IPythonNotebookApp
164 166 #-----------------------------------------------------------------------------
165 167
166 168 class IPythonNotebookApp(BaseIPythonApplication):
167 169 name = 'ipython-notebook'
168 170 default_config_file_name='ipython_notebook_config.py'
169 171
170 172 description = """
171 173 The IPython HTML Notebook.
172 174
173 175 This launches a Tornado based HTML Notebook Server that serves up an
174 176 HTML5/Javascript Notebook client.
175 177 """
176 178
177 179 classes = [IPKernelApp, ZMQInteractiveShell, ProfileDir, Session,
178 180 KernelManager, SessionManager, RichIPythonWidget]
179 181 flags = Dict(flags)
180 182 aliases = Dict(aliases)
181 183
182 184 kernel_argv = List(Unicode)
183 185
184 186 log_level = Enum((0,10,20,30,40,50,'DEBUG','INFO','WARN','ERROR','CRITICAL'),
185 187 default_value=logging.INFO,
186 188 config=True,
187 189 help="Set the log level by value or name.")
188 190
189 191 # connection info:
190 192 ip = Unicode(LOCALHOST, config=True,
191 193 help="The IP address the notebook server will listen on."
192 194 )
193 195
194 196 port = Int(8888, config=True,
195 197 help="The port the notebook server will listen on."
196 198 )
197 199
198 200 # the factory for creating a widget
199 201 widget_factory = Any(RichIPythonWidget)
200 202
201 203 def parse_command_line(self, argv=None):
202 204 super(IPythonNotebookApp, self).parse_command_line(argv)
203 205 if argv is None:
204 206 argv = sys.argv[1:]
205 207
206 208 self.kernel_argv = list(argv) # copy
207 209 # kernel should inherit default config file from frontend
208 210 self.kernel_argv.append("--KernelApp.parent_appname='%s'"%self.name)
209 211 # scrub frontend-specific flags
210 212 for a in argv:
211 213 if a.startswith('-') and a.lstrip('-') in notebook_flags:
212 214 self.kernel_argv.remove(a)
213 215
214 216 def init_kernel_manager(self):
215 217 # Don't let Qt or ZMQ swallow KeyboardInterupts.
216 218 signal.signal(signal.SIGINT, signal.SIG_DFL)
217 219
218 220 # Create a KernelManager and start a kernel.
219 221 self.kernel_manager = KernelManager(config=self.config, log=self.log)
220 222
221 223 def init_logging(self):
222 224 super(IPythonNotebookApp, self).init_logging()
223 225 # This prevents double log messages because tornado use a root logger that
224 226 # self.log is a child of. The logging module dipatches log messages to a log
225 227 # and all of its ancenstors until propagate is set to False.
226 228 self.log.propagate = False
227 229
228 230 def initialize(self, argv=None):
229 231 super(IPythonNotebookApp, self).initialize(argv)
230 232 self.init_kernel_manager()
231 233 self.web_app = NotebookWebApplication(self.kernel_manager, self.log, self.kernel_argv)
232 234 self.http_server = httpserver.HTTPServer(self.web_app)
233 235 self.http_server.listen(self.port)
234 236
235 237 def start(self):
236 238 self.log.info("The IPython Notebook is running at: http://%s:%i" % (self.ip, self.port))
237 239 ioloop.IOLoop.instance().start()
238 240
239 241 #-----------------------------------------------------------------------------
240 242 # Main entry point
241 243 #-----------------------------------------------------------------------------
242 244
243 245 def launch_new_instance():
244 246 app = IPythonNotebookApp()
245 247 app.initialize()
246 248 app.start()
247 249
@@ -1,57 +1,58 b''
1 1 import uuid
2 2 from Queue import Queue
3
3 import json
4 4
5 5 class ZMQStreamRouter(object):
6 6
7 def __init__(self, zmq_stream):
7 def __init__(self, zmq_stream, session):
8 8 self.zmq_stream = zmq_stream
9 self.session = session
9 10 self._clients = {}
10 11 self.zmq_stream.on_recv(self._on_zmq_reply)
11 12
12 13 def register_client(self, client):
13 14 client_id = uuid.uuid4()
14 15 self._clients[client_id] = client
15 16 return client_id
16 17
17 18 def unregister_client(self, client_id):
18 19 del self._clients[client_id]
19 20
20 21 def copy_clients(self, router):
21 22 # Copy the clients of another router.
22 23 for client_id, client in router._clients.items():
23 24 client.router = self
24 25 self._clients[client_id] = client
25 26
26 27
27 28 class IOPubStreamRouter(ZMQStreamRouter):
28 29
29 30 def _on_zmq_reply(self, msg_list):
30 31 for client_id, client in self._clients.items():
31 32 for msg in msg_list:
33 print "Got message: ", msg
32 34 client.write_message(msg)
33 35
34 36 def forward_unicode(self, client_id, msg):
35 37 # This is a SUB stream that we should never write to.
36 38 pass
37 39
38 40
39 41 class ShellStreamRouter(ZMQStreamRouter):
40 42
41 def __init__(self, zmq_stream):
42 ZMQStreamRouter.__init__(self, zmq_stream)
43 def __init__(self, zmq_stream, session):
44 ZMQStreamRouter.__init__(self, zmq_stream, session)
43 45 self._request_queue = Queue()
44 46
45 47 def _on_zmq_reply(self, msg_list):
46 48 client_id = self._request_queue.get(block=False)
47 49 client = self._clients.get(client_id)
48 50 if client is not None:
49 51 for msg in msg_list:
50 52 client.write_message(msg)
51 53
52 54 def forward_unicode(self, client_id, msg):
55 print "Inbound message: ", msg
53 56 self._request_queue.put(client_id)
54 self.zmq_stream.send_unicode(msg)
55
56
57 self.session.send(self.zmq_stream, msg)
57 58
General Comments 0
You need to be logged in to leave comments. Login now