##// END OF EJS Templates
Different clients now share a single zmq session....
Brian Granger -
Show More
@@ -1,5 +1,6 b''
1 import signal
1 import signal
2 import sys
2 import sys
3 import uuid
3
4
4 from IPython.zmq.ipkernel import launch_kernel
5 from IPython.zmq.ipkernel import launch_kernel
5 from session import SessionManager
6 from session import SessionManager
@@ -30,9 +31,8 b' class KernelManager(object):'
30 else:
31 else:
31 return False
32 return False
32
33
33 def start_kernel(self, kernel_id):
34 def start_kernel(self):
34 if kernel_id in self._kernels:
35 kernel_id = str(uuid.uuid4())
35 raise DuplicateKernelError("Kernel already exists: %s" % kernel_id)
36 (process, shell_port, iopub_port, stdin_port, hb_port) = launch_kernel(pylab='inline')
36 (process, shell_port, iopub_port, stdin_port, hb_port) = launch_kernel(pylab='inline')
37 d = dict(
37 d = dict(
38 process = process,
38 process = process,
@@ -3,6 +3,8 b' import json'
3 import logging
3 import logging
4 import os
4 import os
5 import urllib
5 import urllib
6 import uuid
7 from Queue import Queue
6
8
7 import zmq
9 import zmq
8
10
@@ -21,8 +23,7 b' from kernelmanager import KernelManager'
21
23
22 options.define("port", default=8888, help="run on the given port", type=int)
24 options.define("port", default=8888, help="run on the given port", type=int)
23
25
24 _session_id_regex = r"(?P<session_id>\w+-\w+-\w+-\w+-\w+)"
26 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
25 _kernel_id_regex = r"(?P<kernel_id>\w+)"
26
27
27
28
28 class MainHandler(web.RequestHandler):
29 class MainHandler(web.RequestHandler):
@@ -30,79 +31,79 b' class MainHandler(web.RequestHandler):'
30 self.render('notebook.html')
31 self.render('notebook.html')
31
32
32
33
33 class BaseKernelHandler(object):
34 class KernelHandler(web.RequestHandler):
34
35 def get_kernel(self):
36 return self.application.kernel_manager
37
38 def get_session(self, kernel_id):
39 km = self.get_kernel()
40 sm = km.get_session_manager(kernel_id)
41 return sm
42
43
44 class KernelHandler(web.RequestHandler, BaseKernelHandler):
45
35
46 def get(self):
36 def get(self):
47 self.write(json.dumps(self.get_kernel().kernel_ids))
37 self.write(json.dumps(self.application.kernel_ids))
48
38
49 def post(self, *args, **kwargs):
39 def post(self):
50 kernel_id = kwargs['kernel_id']
40 kernel_id = self.application.start_kernel()
51 self.get_kernel().start_kernel(kernel_id)
41 self.application.start_session(kernel_id)
52 logging.info("Starting kernel: %s" % kernel_id)
53 self.write(json.dumps(kernel_id))
42 self.write(json.dumps(kernel_id))
54
43
55
44
56 class SessionHandler(web.RequestHandler, BaseKernelHandler):
45 class ZMQStreamRouter(object):
57
46
58 def get(self, *args, **kwargs):
47 def __init__(self, zmq_stream):
59 kernel_id = kwargs['kernel_id']
48 self.zmq_stream = zmq_stream
60 self.write(json.dumps(self.get_session(kernel_id).session_ids))
49 self._clients = {}
50 self.zmq_stream.on_recv(self._on_zmq_reply)
61
51
62 def post(self, *args, **kwargs):
52 def register_client(self, client):
63 kernel_id = kwargs['kernel_id']
53 client_id = uuid.uuid4()
64 sm = self.get_session(kernel_id)
54 self._clients[client_id] = client
65 session_id = sm.start_session()
55 return client_id
66 logging.info("Starting session: %s, %s" % (kernel_id, session_id))
67 self.write(json.dumps(session_id))
68
56
57 def unregister_client(self, client_id):
58 del self._clients[client_id]
69
59
70 class ZMQStreamHandler(websocket.WebSocketHandler, BaseKernelHandler):
71
60
72 stream_name = ''
61 class IOPubStreamRouter(ZMQStreamRouter):
73
62
74 def open(self, *args, **kwargs):
63 def _on_zmq_reply(self, msg_list):
75 kernel_id = kwargs['kernel_id']
64 for client_id, client in self._clients.items():
76 session_id = kwargs['session_id']
65 for msg in msg_list:
77 logging.info("Connection open: %s, %s" % (kernel_id,session_id))
66 client.write_message(msg)
78 sm = self.get_session(kernel_id)
79 method_name = "get_%s_stream" % self.stream_name
80 method = getattr(sm, method_name)
81 self.zmq_stream = method(session_id)
82 self.zmq_stream.on_recv(self._on_zmq_reply)
83
67
84 def on_message(self, msg):
68 def forward_unicode(self, client_id, msg):
85 logging.info("Message received: %r, %r" % (msg, self.__class__))
69 # This is a SUB stream that we should never write to.
86 logging.info(self.zmq_stream)
70 pass
87 self.zmq_stream.send_unicode(msg)
88
71
89 def on_close(self):
72
90 self.zmq_stream.close()
73 class ShellStreamRouter(ZMQStreamRouter):
74
75 def __init__(self, zmq_stream):
76 ZMQStreamRouter.__init__(self, zmq_stream)
77 self._request_queue = Queue()
91
78
92 def _on_zmq_reply(self, msg_list):
79 def _on_zmq_reply(self, msg_list):
93 for msg in msg_list:
80 client_id = self._request_queue.get(block=False)
94 logging.info("Message reply: %r" % msg)
81 client = self._clients.get(client_id)
95 self.write_message(msg)
82 if client is not None:
83 for msg in msg_list:
84 client.write_message(msg)
85
86 def forward_unicode(self, client_id, msg):
87 self._request_queue.put(client_id)
88 self.zmq_stream.send_unicode(msg)
96
89
97
90
98 class IOPubStreamHandler(ZMQStreamHandler):
91 class ZMQStreamHandler(websocket.WebSocketHandler):
99
92
100 stream_name = 'iopub'
93 def initialize(self, stream_name):
94 self.stream_name = stream_name
101
95
96 def open(self, kernel_id):
97 self.router = self.application.get_router(kernel_id, self.stream_name)
98 self.client_id = self.router.register_client(self)
99 logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
102
100
103 class ShellStreamHandler(ZMQStreamHandler):
101 def on_message(self, msg):
102 self.router.forward_unicode(self.client_id, msg)
104
103
105 stream_name = 'shell'
104 def on_close(self):
105 self.router.unregister_client(self.client_id)
106 logging.info("Connection closed: %s" % self.client_id)
106
107
107
108
108 class NotebookRootHandler(web.RequestHandler):
109 class NotebookRootHandler(web.RequestHandler):
@@ -157,10 +158,9 b' class NotebookApplication(web.Application):'
157 def __init__(self):
158 def __init__(self):
158 handlers = [
159 handlers = [
159 (r"/", MainHandler),
160 (r"/", MainHandler),
160 (r"/kernels/%s" % (_kernel_id_regex,), KernelHandler),
161 (r"/kernels", KernelHandler),
161 (r"/kernels/%s/sessions" % (_kernel_id_regex,), SessionHandler),
162 (r"/kernels/%s/iopub" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='iopub')),
162 (r"/kernels/%s/sessions/%s/iopub" % (_kernel_id_regex,_session_id_regex), IOPubStreamHandler),
163 (r"/kernels/%s/shell" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='shell')),
163 (r"/kernels/%s/sessions/%s/shell" % (_kernel_id_regex,_session_id_regex), ShellStreamHandler),
164 (r"/notebooks", NotebookRootHandler),
164 (r"/notebooks", NotebookRootHandler),
165 (r"/notebooks/([^/]+)", NotebookHandler)
165 (r"/notebooks/([^/]+)", NotebookHandler)
166 ]
166 ]
@@ -169,8 +169,46 b' class NotebookApplication(web.Application):'
169 static_path=os.path.join(os.path.dirname(__file__), "static"),
169 static_path=os.path.join(os.path.dirname(__file__), "static"),
170 )
170 )
171 web.Application.__init__(self, handlers, **settings)
171 web.Application.__init__(self, handlers, **settings)
172
172 self.context = zmq.Context()
173 self.context = zmq.Context()
173 self.kernel_manager = KernelManager(self.context)
174 self.kernel_manager = KernelManager(self.context)
175 self._session_dict = {}
176 self._routers = {}
177
178 #-------------------------------------------------------------------------
179 # Methods for managing kernels and sessions
180 #-------------------------------------------------------------------------
181
182 @property
183 def kernel_ids(self):
184 return self.kernel_manager.kernel_ids
185
186 def start_kernel(self):
187 kernel_id = self.kernel_manager.start_kernel()
188 logging.info("Kernel started: %s" % kernel_id)
189 return kernel_id
190
191 def start_session(self, kernel_id):
192 sm = self.kernel_manager.get_session_manager(kernel_id)
193 session_id = sm.start_session()
194 self._session_dict[kernel_id] = session_id
195 iopub_stream = sm.get_iopub_stream(session_id)
196 shell_stream = sm.get_shell_stream(session_id)
197 iopub_router = IOPubStreamRouter(iopub_stream)
198 shell_router = ShellStreamRouter(shell_stream)
199 self._routers[(kernel_id, session_id, 'iopub')] = iopub_router
200 self._routers[(kernel_id, session_id, 'shell')] = shell_router
201 logging.info("Session started: %s, %s" % (kernel_id, session_id))
202
203 def stop_session(self, kernel_id):
204 # TODO: finish this!
205 sm = self.kernel_manager.get_session_manager(kernel_id)
206 session_id = self._session_dict[kernel_id]
207
208 def get_router(self, kernel_id, stream_name):
209 session_id = self._session_dict[kernel_id]
210 router = self._routers[(kernel_id, session_id, stream_name)]
211 return router
174
212
175
213
176 def main():
214 def main():
@@ -92,7 +92,6 b' var Notebook = function (selector) {'
92 this.element.scroll();
92 this.element.scroll();
93 this.element.data("notebook", this);
93 this.element.data("notebook", this);
94 this.next_prompt_number = 1;
94 this.next_prompt_number = 1;
95 this.next_kernel_number = 0;
96 this.kernel = null;
95 this.kernel = null;
97 this.msg_cell_map = {};
96 this.msg_cell_map = {};
98 this.bind_events();
97 this.bind_events();
@@ -429,20 +428,13 b' Notebook.prototype.expand = function (index) {'
429 // Kernel related things
428 // Kernel related things
430
429
431 Notebook.prototype.start_kernel = function () {
430 Notebook.prototype.start_kernel = function () {
432 this.kernel = new Kernel("kernel" + this.next_kernel_number);
431 this.kernel = new Kernel();
433 this.next_kernel_number = this.next_kernel_number + 1;
434 this.kernel.start_kernel(this._kernel_started, this);
432 this.kernel.start_kernel(this._kernel_started, this);
435 };
433 };
436
434
437
435
438 Notebook.prototype._kernel_started = function () {
436 Notebook.prototype._kernel_started = function () {
439 console.log("Kernel started: ", this.kernel.kernel_id);
437 console.log("Kernel started: ", this.kernel.kernel_id);
440 this.kernel.start_session(this._session_started, this);
441 };
442
443
444 Notebook.prototype._session_started = function () {
445 console.log("Session started: ", this.kernel.session_id);
446 var that = this;
438 var that = this;
447
439
448 this.kernel.shell_channel.onmessage = function (e) {
440 this.kernel.shell_channel.onmessage = function (e) {
@@ -711,11 +703,10 b' TextCell.prototype.config_mathjax = function () {'
711 //============================================================================
703 //============================================================================
712
704
713
705
714 var Kernel = function (kernel_id) {
706 var Kernel = function () {
715 this.kernel_id = kernel_id;
707 this.kernel_id = null;
716 this.base_url = "/kernels";
708 this.base_url = "/kernels";
717 this.kernel_url = this.base_url + "/" + this.kernel_id
709 this.kernel_url = null;
718 this.session_id = null;
719 };
710 };
720
711
721
712
@@ -734,32 +725,26 b' Kernel.prototype.get_msg = function (msg_type, content) {'
734 }
725 }
735
726
736 Kernel.prototype.start_kernel = function (callback, context) {
727 Kernel.prototype.start_kernel = function (callback, context) {
737 $.post(this.kernel_url, function () {
738 callback.call(context);
739 });
740 };
741
742
743 Kernel.prototype.start_session = function (callback, context) {
744 var that = this;
728 var that = this;
745 $.post(this.kernel_url + "/sessions",
729 $.post(this.base_url,
746 function (session_id) {
730 function (kernel_id) {
747 that._handle_start_session(session_id, callback, context);
731 that._handle_start_kernel(kernel_id, callback, context);
748 },
732 },
749 'json');
733 'json'
750 }
734 );
735 };
751
736
752
737
753 Kernel.prototype._handle_start_session = function (session_id, callback, context) {
738 Kernel.prototype._handle_start_kernel = function (kernel_id, callback, context) {
754 this.session_id = session_id;
739 this.kernel_id = kernel_id;
755 this.session_url = this.kernel_url + "/sessions/" + this.session_id;
740 this.kernel_url = this.base_url + "/" + this.kernel_id;
756 this._start_channels();
741 this._start_channels();
757 callback.call(context);
742 callback.call(context);
758 };
743 };
759
744
760
745
761 Kernel.prototype._start_channels = function () {
746 Kernel.prototype._start_channels = function () {
762 var ws_url = "ws://127.0.0.1:8888" + this.session_url;
747 var ws_url = "ws://127.0.0.1:8888" + this.kernel_url;
763 this.shell_channel = new WebSocket(ws_url + "/shell");
748 this.shell_channel = new WebSocket(ws_url + "/shell");
764 this.iopub_channel = new WebSocket(ws_url + "/iopub");
749 this.iopub_channel = new WebSocket(ws_url + "/iopub");
765 }
750 }
General Comments 0
You need to be logged in to leave comments. Login now