Show More
@@ -1,117 +1,117 b'' | |||
|
1 | 1 | """Routers that connect WebSockets to ZMQ sockets.""" |
|
2 | 2 | |
|
3 | 3 | #----------------------------------------------------------------------------- |
|
4 | 4 | # Copyright (C) 2011 The IPython Development Team |
|
5 | 5 | # |
|
6 | 6 | # Distributed under the terms of the BSD License. The full license is in |
|
7 | 7 | # the file COPYING.txt, distributed as part of this software. |
|
8 | 8 | #----------------------------------------------------------------------------- |
|
9 | 9 | |
|
10 | 10 | #----------------------------------------------------------------------------- |
|
11 | 11 | # Imports |
|
12 | 12 | #----------------------------------------------------------------------------- |
|
13 | 13 | |
|
14 | 14 | import uuid |
|
15 | 15 | from Queue import Queue |
|
16 | 16 | import json |
|
17 | 17 | |
|
18 | 18 | from IPython.config.configurable import Configurable |
|
19 | 19 | from IPython.utils.traitlets import Instance, Int, Dict |
|
20 | 20 | |
|
21 | 21 | #----------------------------------------------------------------------------- |
|
22 | 22 | # Classes |
|
23 | 23 | #----------------------------------------------------------------------------- |
|
24 | 24 | |
|
25 | 25 | class ZMQStreamRouter(Configurable): |
|
26 | 26 | |
|
27 | 27 | zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
28 | 28 | session = Instance('IPython.zmq.session.Session') |
|
29 | 29 | max_msg_size = Int(2048, config=True, help=""" |
|
30 | 30 | The max raw message size accepted from the browser |
|
31 | 31 | over a WebSocket connection. |
|
32 | 32 | """) |
|
33 | 33 | |
|
34 | 34 | _clients = Dict() |
|
35 | 35 | |
|
36 | 36 | def __init__(self, **kwargs): |
|
37 | 37 | super(ZMQStreamRouter,self).__init__(**kwargs) |
|
38 | 38 | self.zmq_stream.on_recv(self._on_zmq_reply) |
|
39 | 39 | |
|
40 | 40 | def register_client(self, client): |
|
41 | 41 | """Register a client, returning a client uuid.""" |
|
42 | 42 | client_id = uuid.uuid4() |
|
43 | 43 | self._clients[client_id] = client |
|
44 | 44 | return client_id |
|
45 | 45 | |
|
46 | 46 | def unregister_client(self, client_id): |
|
47 | 47 | """Unregister a client by its client uuid.""" |
|
48 | 48 | del self._clients[client_id] |
|
49 | 49 | |
|
50 | 50 | def copy_clients(self, router): |
|
51 | 51 | """Copy the clients of another router to this one. |
|
52 | 52 | |
|
53 | 53 | This is used to enable the backend zeromq stream to disconnect |
|
54 | 54 | and reconnect while the WebSocket connections to browsers |
|
55 | 55 | remain, such as when a kernel is restarted. |
|
56 | 56 | """ |
|
57 | 57 | for client_id, client in router._clients.items(): |
|
58 | 58 | client.router = self |
|
59 | 59 | self._clients[client_id] = client |
|
60 | 60 | |
|
61 | 61 | def forward_msg(self, client_id, msg): |
|
62 | 62 | """Forward a msg to a client by its id. |
|
63 | 63 | |
|
64 | 64 | The default implementation of this will fail silently if a message |
|
65 | 65 | arrives on a socket that doesn't support it. This method should |
|
66 | 66 | use max_msg_size to check and silently discard message that are too |
|
67 | 67 | long.""" |
|
68 | 68 | pass |
|
69 | 69 | |
|
70 | 70 | def _on_zmq_reply(self, msg_list): |
|
71 | 71 | """Handle a message the ZMQ stream sends to the router. |
|
72 | 72 | |
|
73 | 73 | Usually, this is where the return message will be written to |
|
74 | 74 | clients that need it using client.write_message(). |
|
75 | 75 | """ |
|
76 | 76 | pass |
|
77 | 77 | |
|
78 | 78 | def _reserialize_reply(self, msg_list): |
|
79 | 79 | """Reserialize a reply message using JSON. |
|
80 | 80 | |
|
81 | 81 | This takes the msg list from the ZMQ socket, unserializes it using |
|
82 | 82 | self.session and then serializes the result using JSON. This method |
|
83 | 83 | should be used by self._on_zmq_reply to build messages that can |
|
84 | 84 | be sent back to the browser. |
|
85 | 85 | """ |
|
86 | 86 | idents, msg_list = self.session.feed_identities(msg_list) |
|
87 |
msg = self.session.un |
|
|
87 | msg = self.session.unserialize(msg_list) | |
|
88 | 88 | msg['header'].pop('date') |
|
89 | 89 | return json.dumps(msg) |
|
90 | 90 | |
|
91 | 91 | |
|
92 | 92 | class IOPubStreamRouter(ZMQStreamRouter): |
|
93 | 93 | |
|
94 | 94 | def _on_zmq_reply(self, msg_list): |
|
95 | 95 | msg = self._reserialize_reply(msg_list) |
|
96 | 96 | for client_id, client in self._clients.items(): |
|
97 | 97 | client.write_message(msg) |
|
98 | 98 | |
|
99 | 99 | |
|
100 | 100 | class ShellStreamRouter(ZMQStreamRouter): |
|
101 | 101 | |
|
102 | 102 | _request_queue = Instance(Queue,(),{}) |
|
103 | 103 | |
|
104 | 104 | def _on_zmq_reply(self, msg_list): |
|
105 | 105 | msg = self._reserialize_reply(msg_list) |
|
106 | 106 | client_id = self._request_queue.get(block=False) |
|
107 | 107 | client = self._clients.get(client_id) |
|
108 | 108 | if client is not None: |
|
109 | 109 | client.write_message(msg) |
|
110 | 110 | |
|
111 | 111 | def forward_msg(self, client_id, msg): |
|
112 | 112 | if len(msg) < self.max_msg_size: |
|
113 | 113 | msg = json.loads(msg) |
|
114 | 114 | to_send = self.session.serialize(msg) |
|
115 | 115 | self._request_queue.put(client_id) |
|
116 | 116 | self.session.send(self.zmq_stream, msg) |
|
117 | 117 |
General Comments 0
You need to be logged in to leave comments.
Login now