##// END OF EJS Templates
Change unpack_message to unserialize in routers.py.
Brian E. Granger -
Show More
@@ -1,117 +1,117 b''
1 1 """Routers that connect WebSockets to ZMQ sockets."""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING.txt, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13
14 14 import uuid
15 15 from Queue import Queue
16 16 import json
17 17
18 18 from IPython.config.configurable import Configurable
19 19 from IPython.utils.traitlets import Instance, Int, Dict
20 20
21 21 #-----------------------------------------------------------------------------
22 22 # Classes
23 23 #-----------------------------------------------------------------------------
24 24
25 25 class ZMQStreamRouter(Configurable):
26 26
27 27 zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
28 28 session = Instance('IPython.zmq.session.Session')
29 29 max_msg_size = Int(2048, config=True, help="""
30 30 The max raw message size accepted from the browser
31 31 over a WebSocket connection.
32 32 """)
33 33
34 34 _clients = Dict()
35 35
36 36 def __init__(self, **kwargs):
37 37 super(ZMQStreamRouter,self).__init__(**kwargs)
38 38 self.zmq_stream.on_recv(self._on_zmq_reply)
39 39
40 40 def register_client(self, client):
41 41 """Register a client, returning a client uuid."""
42 42 client_id = uuid.uuid4()
43 43 self._clients[client_id] = client
44 44 return client_id
45 45
46 46 def unregister_client(self, client_id):
47 47 """Unregister a client by its client uuid."""
48 48 del self._clients[client_id]
49 49
50 50 def copy_clients(self, router):
51 51 """Copy the clients of another router to this one.
52 52
53 53 This is used to enable the backend zeromq stream to disconnect
54 54 and reconnect while the WebSocket connections to browsers
55 55 remain, such as when a kernel is restarted.
56 56 """
57 57 for client_id, client in router._clients.items():
58 58 client.router = self
59 59 self._clients[client_id] = client
60 60
61 61 def forward_msg(self, client_id, msg):
62 62 """Forward a msg to a client by its id.
63 63
64 64 The default implementation of this will fail silently if a message
65 65 arrives on a socket that doesn't support it. This method should
66 66 use max_msg_size to check and silently discard message that are too
67 67 long."""
68 68 pass
69 69
70 70 def _on_zmq_reply(self, msg_list):
71 71 """Handle a message the ZMQ stream sends to the router.
72 72
73 73 Usually, this is where the return message will be written to
74 74 clients that need it using client.write_message().
75 75 """
76 76 pass
77 77
78 78 def _reserialize_reply(self, msg_list):
79 79 """Reserialize a reply message using JSON.
80 80
81 81 This takes the msg list from the ZMQ socket, unserializes it using
82 82 self.session and then serializes the result using JSON. This method
83 83 should be used by self._on_zmq_reply to build messages that can
84 84 be sent back to the browser.
85 85 """
86 86 idents, msg_list = self.session.feed_identities(msg_list)
87 msg = self.session.unpack_message(msg_list)
87 msg = self.session.unserialize(msg_list)
88 88 msg['header'].pop('date')
89 89 return json.dumps(msg)
90 90
91 91
92 92 class IOPubStreamRouter(ZMQStreamRouter):
93 93
94 94 def _on_zmq_reply(self, msg_list):
95 95 msg = self._reserialize_reply(msg_list)
96 96 for client_id, client in self._clients.items():
97 97 client.write_message(msg)
98 98
99 99
100 100 class ShellStreamRouter(ZMQStreamRouter):
101 101
102 102 _request_queue = Instance(Queue,(),{})
103 103
104 104 def _on_zmq_reply(self, msg_list):
105 105 msg = self._reserialize_reply(msg_list)
106 106 client_id = self._request_queue.get(block=False)
107 107 client = self._clients.get(client_id)
108 108 if client is not None:
109 109 client.write_message(msg)
110 110
111 111 def forward_msg(self, client_id, msg):
112 112 if len(msg) < self.max_msg_size:
113 113 msg = json.loads(msg)
114 114 to_send = self.session.serialize(msg)
115 115 self._request_queue.put(client_id)
116 116 self.session.send(self.zmq_stream, msg)
117 117
General Comments 0
You need to be logged in to leave comments. Login now