##// END OF EJS Templates
Change unpack_message to unserialize in routers.py.
Brian E. Granger -
Show More
@@ -1,117 +1,117 b''
1 """Routers that connect WebSockets to ZMQ sockets."""
1 """Routers that connect WebSockets to ZMQ sockets."""
2
2
3 #-----------------------------------------------------------------------------
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING.txt, distributed as part of this software.
7 # the file COPYING.txt, distributed as part of this software.
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9
9
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 import uuid
14 import uuid
15 from Queue import Queue
15 from Queue import Queue
16 import json
16 import json
17
17
18 from IPython.config.configurable import Configurable
18 from IPython.config.configurable import Configurable
19 from IPython.utils.traitlets import Instance, Int, Dict
19 from IPython.utils.traitlets import Instance, Int, Dict
20
20
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22 # Classes
22 # Classes
23 #-----------------------------------------------------------------------------
23 #-----------------------------------------------------------------------------
24
24
25 class ZMQStreamRouter(Configurable):
25 class ZMQStreamRouter(Configurable):
26
26
27 zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
27 zmq_stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
28 session = Instance('IPython.zmq.session.Session')
28 session = Instance('IPython.zmq.session.Session')
29 max_msg_size = Int(2048, config=True, help="""
29 max_msg_size = Int(2048, config=True, help="""
30 The max raw message size accepted from the browser
30 The max raw message size accepted from the browser
31 over a WebSocket connection.
31 over a WebSocket connection.
32 """)
32 """)
33
33
34 _clients = Dict()
34 _clients = Dict()
35
35
36 def __init__(self, **kwargs):
36 def __init__(self, **kwargs):
37 super(ZMQStreamRouter,self).__init__(**kwargs)
37 super(ZMQStreamRouter,self).__init__(**kwargs)
38 self.zmq_stream.on_recv(self._on_zmq_reply)
38 self.zmq_stream.on_recv(self._on_zmq_reply)
39
39
40 def register_client(self, client):
40 def register_client(self, client):
41 """Register a client, returning a client uuid."""
41 """Register a client, returning a client uuid."""
42 client_id = uuid.uuid4()
42 client_id = uuid.uuid4()
43 self._clients[client_id] = client
43 self._clients[client_id] = client
44 return client_id
44 return client_id
45
45
46 def unregister_client(self, client_id):
46 def unregister_client(self, client_id):
47 """Unregister a client by its client uuid."""
47 """Unregister a client by its client uuid."""
48 del self._clients[client_id]
48 del self._clients[client_id]
49
49
50 def copy_clients(self, router):
50 def copy_clients(self, router):
51 """Copy the clients of another router to this one.
51 """Copy the clients of another router to this one.
52
52
53 This is used to enable the backend zeromq stream to disconnect
53 This is used to enable the backend zeromq stream to disconnect
54 and reconnect while the WebSocket connections to browsers
54 and reconnect while the WebSocket connections to browsers
55 remain, such as when a kernel is restarted.
55 remain, such as when a kernel is restarted.
56 """
56 """
57 for client_id, client in router._clients.items():
57 for client_id, client in router._clients.items():
58 client.router = self
58 client.router = self
59 self._clients[client_id] = client
59 self._clients[client_id] = client
60
60
61 def forward_msg(self, client_id, msg):
61 def forward_msg(self, client_id, msg):
62 """Forward a msg to a client by its id.
62 """Forward a msg to a client by its id.
63
63
64 The default implementation of this will fail silently if a message
64 The default implementation of this will fail silently if a message
65 arrives on a socket that doesn't support it. This method should
65 arrives on a socket that doesn't support it. This method should
66 use max_msg_size to check and silently discard message that are too
66 use max_msg_size to check and silently discard message that are too
67 long."""
67 long."""
68 pass
68 pass
69
69
70 def _on_zmq_reply(self, msg_list):
70 def _on_zmq_reply(self, msg_list):
71 """Handle a message the ZMQ stream sends to the router.
71 """Handle a message the ZMQ stream sends to the router.
72
72
73 Usually, this is where the return message will be written to
73 Usually, this is where the return message will be written to
74 clients that need it using client.write_message().
74 clients that need it using client.write_message().
75 """
75 """
76 pass
76 pass
77
77
78 def _reserialize_reply(self, msg_list):
78 def _reserialize_reply(self, msg_list):
79 """Reserialize a reply message using JSON.
79 """Reserialize a reply message using JSON.
80
80
81 This takes the msg list from the ZMQ socket, unserializes it using
81 This takes the msg list from the ZMQ socket, unserializes it using
82 self.session and then serializes the result using JSON. This method
82 self.session and then serializes the result using JSON. This method
83 should be used by self._on_zmq_reply to build messages that can
83 should be used by self._on_zmq_reply to build messages that can
84 be sent back to the browser.
84 be sent back to the browser.
85 """
85 """
86 idents, msg_list = self.session.feed_identities(msg_list)
86 idents, msg_list = self.session.feed_identities(msg_list)
87 msg = self.session.unpack_message(msg_list)
87 msg = self.session.unserialize(msg_list)
88 msg['header'].pop('date')
88 msg['header'].pop('date')
89 return json.dumps(msg)
89 return json.dumps(msg)
90
90
91
91
92 class IOPubStreamRouter(ZMQStreamRouter):
92 class IOPubStreamRouter(ZMQStreamRouter):
93
93
94 def _on_zmq_reply(self, msg_list):
94 def _on_zmq_reply(self, msg_list):
95 msg = self._reserialize_reply(msg_list)
95 msg = self._reserialize_reply(msg_list)
96 for client_id, client in self._clients.items():
96 for client_id, client in self._clients.items():
97 client.write_message(msg)
97 client.write_message(msg)
98
98
99
99
100 class ShellStreamRouter(ZMQStreamRouter):
100 class ShellStreamRouter(ZMQStreamRouter):
101
101
102 _request_queue = Instance(Queue,(),{})
102 _request_queue = Instance(Queue,(),{})
103
103
104 def _on_zmq_reply(self, msg_list):
104 def _on_zmq_reply(self, msg_list):
105 msg = self._reserialize_reply(msg_list)
105 msg = self._reserialize_reply(msg_list)
106 client_id = self._request_queue.get(block=False)
106 client_id = self._request_queue.get(block=False)
107 client = self._clients.get(client_id)
107 client = self._clients.get(client_id)
108 if client is not None:
108 if client is not None:
109 client.write_message(msg)
109 client.write_message(msg)
110
110
111 def forward_msg(self, client_id, msg):
111 def forward_msg(self, client_id, msg):
112 if len(msg) < self.max_msg_size:
112 if len(msg) < self.max_msg_size:
113 msg = json.loads(msg)
113 msg = json.loads(msg)
114 to_send = self.session.serialize(msg)
114 to_send = self.session.serialize(msg)
115 self._request_queue.put(client_id)
115 self._request_queue.put(client_id)
116 self.session.send(self.zmq_stream, msg)
116 self.session.send(self.zmq_stream, msg)
117
117
General Comments 0
You need to be logged in to leave comments. Login now