Show More
@@ -1,117 +1,117 b'' | |||||
1 | """Routers that connect WebSockets to ZMQ sockets.""" |
|
1 | """Routers that connect WebSockets to ZMQ sockets.""" | |
2 |
|
2 | |||
3 | #----------------------------------------------------------------------------- |
|
3 | #----------------------------------------------------------------------------- | |
4 | # Copyright (C) 2011 The IPython Development Team |
|
4 | # Copyright (C) 2011 The IPython Development Team | |
5 | # |
|
5 | # | |
6 | # Distributed under the terms of the BSD License. The full license is in |
|
6 | # Distributed under the terms of the BSD License. The full license is in | |
7 | # the file COPYING.txt, distributed as part of this software. |
|
7 | # the file COPYING.txt, distributed as part of this software. | |
8 | #----------------------------------------------------------------------------- |
|
8 | #----------------------------------------------------------------------------- | |
9 |
|
9 | |||
10 | #----------------------------------------------------------------------------- |
|
10 | #----------------------------------------------------------------------------- | |
11 | # Imports |
|
11 | # Imports | |
12 | #----------------------------------------------------------------------------- |
|
12 | #----------------------------------------------------------------------------- | |
13 |
|
13 | |||
14 | import uuid |
|
14 | import uuid | |
15 | from Queue import Queue |
|
15 | from Queue import Queue | |
16 | import json |
|
16 | import json | |
17 |
|
17 | |||
18 | from IPython.config.configurable import Configurable |
|
18 | from IPython.config.configurable import Configurable | |
19 | from IPython.utils.traitlets import Instance, Int, Dict |
|
19 | from IPython.utils.traitlets import Instance, Int, Dict | |
20 |
|
20 | |||
21 | #----------------------------------------------------------------------------- |
|
21 | #----------------------------------------------------------------------------- | |
22 | # Classes |
|
22 | # Classes | |
23 | #----------------------------------------------------------------------------- |
|
23 | #----------------------------------------------------------------------------- | |
24 |
|
24 | |||
25 | class ZMQStreamRouter(Configurable): |
|
25 | class ZMQStreamRouter(Configurable): | |
26 |
|
26 | |||
27 | zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
27 | zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream') | |
28 | session = Instance('IPython.zmq.session.Session') |
|
28 | session = Instance('IPython.zmq.session.Session') | |
29 | max_msg_size = Int(2048, config=True, help=""" |
|
29 | max_msg_size = Int(2048, config=True, help=""" | |
30 | The max raw message size accepted from the browser |
|
30 | The max raw message size accepted from the browser | |
31 | over a WebSocket connection. |
|
31 | over a WebSocket connection. | |
32 | """) |
|
32 | """) | |
33 |
|
33 | |||
34 | _clients = Dict() |
|
34 | _clients = Dict() | |
35 |
|
35 | |||
36 | def __init__(self, **kwargs): |
|
36 | def __init__(self, **kwargs): | |
37 | super(ZMQStreamRouter,self).__init__(**kwargs) |
|
37 | super(ZMQStreamRouter,self).__init__(**kwargs) | |
38 | self.zmq_stream.on_recv(self._on_zmq_reply) |
|
38 | self.zmq_stream.on_recv(self._on_zmq_reply) | |
39 |
|
39 | |||
40 | def register_client(self, client): |
|
40 | def register_client(self, client): | |
41 | """Register a client, returning a client uuid.""" |
|
41 | """Register a client, returning a client uuid.""" | |
42 | client_id = uuid.uuid4() |
|
42 | client_id = uuid.uuid4() | |
43 | self._clients[client_id] = client |
|
43 | self._clients[client_id] = client | |
44 | return client_id |
|
44 | return client_id | |
45 |
|
45 | |||
46 | def unregister_client(self, client_id): |
|
46 | def unregister_client(self, client_id): | |
47 | """Unregister a client by its client uuid.""" |
|
47 | """Unregister a client by its client uuid.""" | |
48 | del self._clients[client_id] |
|
48 | del self._clients[client_id] | |
49 |
|
49 | |||
50 | def copy_clients(self, router): |
|
50 | def copy_clients(self, router): | |
51 | """Copy the clients of another router to this one. |
|
51 | """Copy the clients of another router to this one. | |
52 |
|
52 | |||
53 | This is used to enable the backend zeromq stream to disconnect |
|
53 | This is used to enable the backend zeromq stream to disconnect | |
54 | and reconnect while the WebSocket connections to browsers |
|
54 | and reconnect while the WebSocket connections to browsers | |
55 | remain, such as when a kernel is restarted. |
|
55 | remain, such as when a kernel is restarted. | |
56 | """ |
|
56 | """ | |
57 | for client_id, client in router._clients.items(): |
|
57 | for client_id, client in router._clients.items(): | |
58 | client.router = self |
|
58 | client.router = self | |
59 | self._clients[client_id] = client |
|
59 | self._clients[client_id] = client | |
60 |
|
60 | |||
61 | def forward_msg(self, client_id, msg): |
|
61 | def forward_msg(self, client_id, msg): | |
62 | """Forward a msg to a client by its id. |
|
62 | """Forward a msg to a client by its id. | |
63 |
|
63 | |||
64 | The default implementation of this will fail silently if a message |
|
64 | The default implementation of this will fail silently if a message | |
65 | arrives on a socket that doesn't support it. This method should |
|
65 | arrives on a socket that doesn't support it. This method should | |
66 | use max_msg_size to check and silently discard message that are too |
|
66 | use max_msg_size to check and silently discard message that are too | |
67 | long.""" |
|
67 | long.""" | |
68 | pass |
|
68 | pass | |
69 |
|
69 | |||
70 | def _on_zmq_reply(self, msg_list): |
|
70 | def _on_zmq_reply(self, msg_list): | |
71 | """Handle a message the ZMQ stream sends to the router. |
|
71 | """Handle a message the ZMQ stream sends to the router. | |
72 |
|
72 | |||
73 | Usually, this is where the return message will be written to |
|
73 | Usually, this is where the return message will be written to | |
74 | clients that need it using client.write_message(). |
|
74 | clients that need it using client.write_message(). | |
75 | """ |
|
75 | """ | |
76 | pass |
|
76 | pass | |
77 |
|
77 | |||
78 | def _reserialize_reply(self, msg_list): |
|
78 | def _reserialize_reply(self, msg_list): | |
79 | """Reserialize a reply message using JSON. |
|
79 | """Reserialize a reply message using JSON. | |
80 |
|
80 | |||
81 | This takes the msg list from the ZMQ socket, unserializes it using |
|
81 | This takes the msg list from the ZMQ socket, unserializes it using | |
82 | self.session and then serializes the result using JSON. This method |
|
82 | self.session and then serializes the result using JSON. This method | |
83 | should be used by self._on_zmq_reply to build messages that can |
|
83 | should be used by self._on_zmq_reply to build messages that can | |
84 | be sent back to the browser. |
|
84 | be sent back to the browser. | |
85 | """ |
|
85 | """ | |
86 | idents, msg_list = self.session.feed_identities(msg_list) |
|
86 | idents, msg_list = self.session.feed_identities(msg_list) | |
87 |
msg = self.session.un |
|
87 | msg = self.session.unserialize(msg_list) | |
88 | msg['header'].pop('date') |
|
88 | msg['header'].pop('date') | |
89 | return json.dumps(msg) |
|
89 | return json.dumps(msg) | |
90 |
|
90 | |||
91 |
|
91 | |||
92 | class IOPubStreamRouter(ZMQStreamRouter): |
|
92 | class IOPubStreamRouter(ZMQStreamRouter): | |
93 |
|
93 | |||
94 | def _on_zmq_reply(self, msg_list): |
|
94 | def _on_zmq_reply(self, msg_list): | |
95 | msg = self._reserialize_reply(msg_list) |
|
95 | msg = self._reserialize_reply(msg_list) | |
96 | for client_id, client in self._clients.items(): |
|
96 | for client_id, client in self._clients.items(): | |
97 | client.write_message(msg) |
|
97 | client.write_message(msg) | |
98 |
|
98 | |||
99 |
|
99 | |||
100 | class ShellStreamRouter(ZMQStreamRouter): |
|
100 | class ShellStreamRouter(ZMQStreamRouter): | |
101 |
|
101 | |||
102 | _request_queue = Instance(Queue,(),{}) |
|
102 | _request_queue = Instance(Queue,(),{}) | |
103 |
|
103 | |||
104 | def _on_zmq_reply(self, msg_list): |
|
104 | def _on_zmq_reply(self, msg_list): | |
105 | msg = self._reserialize_reply(msg_list) |
|
105 | msg = self._reserialize_reply(msg_list) | |
106 | client_id = self._request_queue.get(block=False) |
|
106 | client_id = self._request_queue.get(block=False) | |
107 | client = self._clients.get(client_id) |
|
107 | client = self._clients.get(client_id) | |
108 | if client is not None: |
|
108 | if client is not None: | |
109 | client.write_message(msg) |
|
109 | client.write_message(msg) | |
110 |
|
110 | |||
111 | def forward_msg(self, client_id, msg): |
|
111 | def forward_msg(self, client_id, msg): | |
112 | if len(msg) < self.max_msg_size: |
|
112 | if len(msg) < self.max_msg_size: | |
113 | msg = json.loads(msg) |
|
113 | msg = json.loads(msg) | |
114 | to_send = self.session.serialize(msg) |
|
114 | to_send = self.session.serialize(msg) | |
115 | self._request_queue.put(client_id) |
|
115 | self._request_queue.put(client_id) | |
116 | self.session.send(self.zmq_stream, msg) |
|
116 | self.session.send(self.zmq_stream, msg) | |
117 |
|
117 |
General Comments 0
You need to be logged in to leave comments.
Login now