##// END OF EJS Templates
Merge branch 'kernelproxy' into upstream_master
Brian Granger -
r2607:0ccb242e merge
parent child Browse files
Show More
@@ -0,0 +1,271 b''
1 """Kernel frontend classes.
2
3 To do:
4
5 1. Create custom channel subclasses for Qt.
6 2. Create logger to handle debugging and console messages.
7
8 """
9
10 from Queue import Queue, Empty
11 from threading import Thread
12 import traceback
13
14 import zmq
15 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq.eventloop import ioloop
17 from session import Session
18
19
20 class MissingHandlerError(Exception):
21 pass
22
23
24 class KernelManager(object):
25
26 def __init__(self, xreq_addr, sub_addr, rep_addr,
27 context=None, session=None):
28 self.context = zmq.Context() if context is None else context
29 self.session = Session() if session is None else session
30 self.xreq_addr = xreq_addr
31 self.sub_addr = sub_addr
32 self.rep_addr = rep_addr
33
34 def start_kernel(self):
35 """Start a localhost kernel on ip and port.
36
37 The SUB channel is for the frontend to receive messages published by
38 the kernel.
39
40 The REQ channel is for the frontend to make requests of the kernel.
41
42 The REP channel is for the kernel to request stdin (raw_input) from
43 the frontend.
44 """
45
46 def kill_kernel(self):
47 """Kill the running kernel"""
48
49 def is_alive(self):
50 """Is the kernel alive?"""
51 return True
52
53 def signal_kernel(self, signum):
54 """Send signum to the kernel."""
55
56 def get_sub_channel(self):
57 """Get the SUB socket channel object."""
58 return SubSocketChannel(self.context, self.session, self.sub_addr)
59
60 def get_xreq_channel(self):
61 """Get the REQ socket channel object to make requests of the kernel."""
62 return XReqSocketChannel(self.context, self.session, self.xreq_addr)
63
64 def get_rep_channel(self):
65 """Get the REP socket channel object to handle stdin (raw_input)."""
66 return RepSocketChannel(self.context, self.session, self.rep_addr)
67
68
69 class ZmqSocketChannel(Thread):
70
71 socket = None
72
73 def __init__(self, context, session, addr):
74 self.context = context
75 self.session = session
76 self.addr = addr
77 super(ZmqSocketChannel, self).__init__()
78 self.daemon = True
79
80
81 class SubSocketChannel(ZmqSocketChannel):
82
83 handlers = None
84 _overriden_call_handler = None
85
86 def __init__(self, context, session, addr):
87 self.handlers = {}
88 super(SubSocketChannel, self).__init__(context, session, addr)
89
90 def run(self):
91 self.socket = self.context.socket(zmq.SUB)
92 self.socket.setsockopt(zmq.SUBSCRIBE,'')
93 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
94 self.socket.connect('tcp://%s:%i' % self.addr)
95 self.ioloop = ioloop.IOLoop()
96 self.ioloop.add_handler(self.socket, self._handle_events,
97 POLLIN|POLLERR)
98 self.ioloop.start()
99
100 def _handle_events(self, socket, events):
101 # Turn on and off POLLOUT depending on if we have made a request
102 if events & POLLERR:
103 self._handle_err()
104 if events & POLLIN:
105 self._handle_recv()
106
107 def _handle_err(self):
108 raise zmq.ZmqError()
109
110 def _handle_recv(self):
111 msg = self.socket.recv_json()
112 self.call_handlers(msg)
113
114 def override_call_handler(self, func):
115 """Permanently override the call_handler.
116
117 The function func will be called as::
118
119 func(handler, msg)
120
121 And must call::
122
123 handler(msg)
124
125 in the main thread.
126 """
127 assert callable(func), "not a callable: %r" % func
128 self._overriden_call_handler = func
129
130 def call_handlers(self, msg):
131 handler = self.handlers.get(msg['msg_type'], None)
132 if handler is not None:
133 try:
134 self.call_handler(handler, msg)
135 except:
136 # XXX: This should be logged at least
137 traceback.print_last()
138
139 def call_handler(self, handler, msg):
140 if self._overriden_call_handler is not None:
141 self._overriden_call_handler(handler, msg)
142 elif hasattr(self, '_call_handler'):
143 call_handler = getattr(self, '_call_handler')
144 call_handler(handler, msg)
145 else:
146 raise RuntimeError('no handler!')
147
148 def add_handler(self, callback, msg_type):
149 """Register a callback for msg type."""
150 self.handlers[msg_type] = callback
151
152 def remove_handler(self, msg_type):
153 self.handlers.pop(msg_type, None)
154
155
156 class XReqSocketChannel(ZmqSocketChannel):
157
158 handler_queue = None
159 command_queue = None
160 handlers = None
161 _overriden_call_handler = None
162
163 def __init__(self, context, session, addr):
164 self.handlers = {}
165 self.handler_queue = Queue()
166 self.command_queue = Queue()
167 super(XReqSocketChannel, self).__init__(context, session, addr)
168
169 def run(self):
170 self.socket = self.context.socket(zmq.XREQ)
171 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
172 self.socket.connect('tcp://%s:%i' % self.addr)
173 self.ioloop = ioloop.IOLoop()
174 self.ioloop.add_handler(self.socket, self._handle_events,
175 POLLIN|POLLOUT|POLLERR)
176 self.ioloop.start()
177
178 def _handle_events(self, socket, events):
179 # Turn on and off POLLOUT depending on if we have made a request
180 if events & POLLERR:
181 self._handle_err()
182 if events & POLLOUT:
183 self._handle_send()
184 if events & POLLIN:
185 self._handle_recv()
186
187 def _handle_recv(self):
188 msg = self.socket.recv_json()
189 print "Got reply:", msg
190 try:
191 handler = self.handler_queue.get(False)
192 except Empty:
193 print "Message received with no handler!!!"
194 print msg
195 else:
196 self.call_handler(handler, msg)
197
198 def _handle_send(self):
199 try:
200 msg = self.command_queue.get(False)
201 except Empty:
202 pass
203 else:
204 self.socket.send_json(msg)
205
206 def _handle_err(self):
207 raise zmq.ZmqError()
208
209 def _queue_request(self, msg, callback):
210 handler = self._find_handler(msg['msg_type'], callback)
211 self.handler_queue.put(handler)
212 self.command_queue.put(msg)
213
214 def execute(self, code, callback=None):
215 # Create class for content/msg creation. Related to, but possibly
216 # not in Session.
217 content = dict(code=code)
218 msg = self.session.msg('execute_request', content)
219 self._queue_request(msg, callback)
220 return msg['header']['msg_id']
221
222 def complete(self, text, line, block=None, callback=None):
223 content = dict(text=text, line=line)
224 msg = self.session.msg('complete_request', content)
225 return self._queue_request(msg, callback)
226 return msg['header']['msg_id']
227
228 def object_info(self, oname, callback=None):
229 content = dict(oname=oname)
230 msg = self.session.msg('object_info_request', content)
231 return self._queue_request(msg, callback)
232 return msg['header']['msg_id']
233
234 def _find_handler(self, name, callback):
235 if callback is not None:
236 return callback
237 handler = self.handlers.get(name)
238 if handler is None:
239 raise MissingHandlerError('No handler defined for method: %s' % name)
240 return handler
241
242 def override_call_handler(self, func):
243 """Permanently override the call_handler.
244
245 The function func will be called as::
246
247 func(handler, msg)
248
249 And must call::
250
251 handler(msg)
252
253 in the main thread.
254 """
255 assert callable(func), "not a callable: %r" % func
256 self._overriden_call_handler = func
257
258 def call_handler(self, handler, msg):
259 if self._overriden_call_handler is not None:
260 self._overriden_call_handler(handler, msg)
261 elif hasattr(self, '_call_handler'):
262 call_handler = getattr(self, '_call_handler')
263 call_handler(handler, msg)
264 else:
265 raise RuntimeError('no handler!')
266
267
268 class RepSocketChannel(ZmqSocketChannel):
269
270 def on_raw_input():
271 pass
@@ -0,0 +1,52 b''
1 from Queue import Queue, Empty
2 import time
3
4 from kernelmanager import KernelManager
5
6 xreq_addr = ('127.0.0.1',5575)
7 sub_addr = ('127.0.0.1', 5576)
8 rep_addr = ('127.0.0.1', 5577)
9
10
11 km = KernelManager(xreq_addr, sub_addr, rep_addr)
12 # xreq_channel = km.get_xreq_channel()
13 sub_channel = km.get_sub_channel()
14
15 # xreq_channel.start()
16 sub_channel.start()
17
18 print "Channels are started"
19
20 def printer(msg):
21 print
22 print msg
23
24 class CallHandler(object):
25
26 def __init__(self):
27 self.queue = Queue()
28
29 def __call__(self, handler, msg):
30 self.queue.put((handler, msg))
31
32 def handle(self):
33 try:
34 handler, msg = self.queue.get(block=False)
35 except Empty:
36 pass
37 else:
38 handler(msg)
39
40 call_handler = CallHandler()
41 sub_channel.override_call_handler(call_handler)
42 sub_channel.add_handler(printer, 'pyin')
43 sub_channel.add_handler(printer, 'pyout')
44 sub_channel.add_handler(printer, 'stdout')
45 sub_channel.add_handler(printer, 'stderr')
46
47 for i in range(100):
48 call_handler.handle()
49 time.sleep(1)
50
51 # xreq_channel.join()
52 sub_channel.join() No newline at end of file
@@ -1,270 +1,272 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Finish implementing `raw_input`.
6 * Finish implementing `raw_input`.
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 call set_parent on all the PUB objects with the message about to be executed.
8 call set_parent on all the PUB objects with the message about to be executed.
9 * Implement random port and security key logic.
9 * Implement random port and security key logic.
10 * Implement control messages.
10 * Implement control messages.
11 * Implement event loop and poll version.
11 * Implement event loop and poll version.
12 """
12 """
13
13
14 import __builtin__
14 import __builtin__
15 import sys
15 import sys
16 import time
16 import time
17 import traceback
17 import traceback
18
18
19 from code import CommandCompiler
19 from code import CommandCompiler
20
20
21 import zmq
21 import zmq
22
22
23 from session import Session, Message, extract_header
23 from session import Session, Message, extract_header
24 from completer import KernelCompleter
24 from completer import KernelCompleter
25
25
26 class OutStream(object):
26 class OutStream(object):
27 """A file like object that publishes the stream to a 0MQ PUB socket."""
27 """A file like object that publishes the stream to a 0MQ PUB socket."""
28
28
29 def __init__(self, session, pub_socket, name, max_buffer=200):
29 def __init__(self, session, pub_socket, name, max_buffer=200):
30 self.session = session
30 self.session = session
31 self.pub_socket = pub_socket
31 self.pub_socket = pub_socket
32 self.name = name
32 self.name = name
33 self._buffer = []
33 self._buffer = []
34 self._buffer_len = 0
34 self._buffer_len = 0
35 self.max_buffer = max_buffer
35 self.max_buffer = max_buffer
36 self.parent_header = {}
36 self.parent_header = {}
37
37
38 def set_parent(self, parent):
38 def set_parent(self, parent):
39 self.parent_header = extract_header(parent)
39 self.parent_header = extract_header(parent)
40
40
41 def close(self):
41 def close(self):
42 self.pub_socket = None
42 self.pub_socket = None
43
43
44 def flush(self):
44 def flush(self):
45 if self.pub_socket is None:
45 if self.pub_socket is None:
46 raise ValueError(u'I/O operation on closed file')
46 raise ValueError(u'I/O operation on closed file')
47 else:
47 else:
48 if self._buffer:
48 if self._buffer:
49 data = ''.join(self._buffer)
49 data = ''.join(self._buffer)
50 content = {u'name':self.name, u'data':data}
50 content = {u'name':self.name, u'data':data}
51 msg = self.session.msg(u'stream', content=content,
51 msg = self.session.msg(u'stream', content=content,
52 parent=self.parent_header)
52 parent=self.parent_header)
53 print>>sys.__stdout__, Message(msg)
53 print>>sys.__stdout__, Message(msg)
54 self.pub_socket.send_json(msg)
54 self.pub_socket.send_json(msg)
55 self._buffer_len = 0
55 self._buffer_len = 0
56 self._buffer = []
56 self._buffer = []
57
57
58 def isattr(self):
58 def isattr(self):
59 return False
59 return False
60
60
61 def next(self):
61 def next(self):
62 raise IOError('Read not supported on a write only stream.')
62 raise IOError('Read not supported on a write only stream.')
63
63
64 def read(self, size=None):
64 def read(self, size=None):
65 raise IOError('Read not supported on a write only stream.')
65 raise IOError('Read not supported on a write only stream.')
66
66
67 readline=read
67 readline=read
68
68
69 def write(self, s):
69 def write(self, s):
70 if self.pub_socket is None:
70 if self.pub_socket is None:
71 raise ValueError('I/O operation on closed file')
71 raise ValueError('I/O operation on closed file')
72 else:
72 else:
73 self._buffer.append(s)
73 self._buffer.append(s)
74 self._buffer_len += len(s)
74 self._buffer_len += len(s)
75 self._maybe_send()
75 self._maybe_send()
76
76
77 def _maybe_send(self):
77 def _maybe_send(self):
78 if '\n' in self._buffer[-1]:
78 if '\n' in self._buffer[-1]:
79 self.flush()
79 self.flush()
80 if self._buffer_len > self.max_buffer:
80 if self._buffer_len > self.max_buffer:
81 self.flush()
81 self.flush()
82
82
83 def writelines(self, sequence):
83 def writelines(self, sequence):
84 if self.pub_socket is None:
84 if self.pub_socket is None:
85 raise ValueError('I/O operation on closed file')
85 raise ValueError('I/O operation on closed file')
86 else:
86 else:
87 for s in sequence:
87 for s in sequence:
88 self.write(s)
88 self.write(s)
89
89
90
90
91 class DisplayHook(object):
91 class DisplayHook(object):
92
92
93 def __init__(self, session, pub_socket):
93 def __init__(self, session, pub_socket):
94 self.session = session
94 self.session = session
95 self.pub_socket = pub_socket
95 self.pub_socket = pub_socket
96 self.parent_header = {}
96 self.parent_header = {}
97
97
98 def __call__(self, obj):
98 def __call__(self, obj):
99 if obj is None:
99 if obj is None:
100 return
100 return
101
101
102 __builtin__._ = obj
102 __builtin__._ = obj
103 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
103 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
104 parent=self.parent_header)
104 parent=self.parent_header)
105 self.pub_socket.send_json(msg)
105 self.pub_socket.send_json(msg)
106
106
107 def set_parent(self, parent):
107 def set_parent(self, parent):
108 self.parent_header = extract_header(parent)
108 self.parent_header = extract_header(parent)
109
109
110
110
111 class RawInput(object):
111 class RawInput(object):
112
112
113 def __init__(self, session, socket):
113 def __init__(self, session, socket):
114 self.session = session
114 self.session = session
115 self.socket = socket
115 self.socket = socket
116
116
117 def __call__(self, prompt=None):
117 def __call__(self, prompt=None):
118 msg = self.session.msg(u'raw_input')
118 msg = self.session.msg(u'raw_input')
119 self.socket.send_json(msg)
119 self.socket.send_json(msg)
120 while True:
120 while True:
121 try:
121 try:
122 reply = self.socket.recv_json(zmq.NOBLOCK)
122 reply = self.socket.recv_json(zmq.NOBLOCK)
123 except zmq.ZMQError, e:
123 except zmq.ZMQError, e:
124 if e.errno == zmq.EAGAIN:
124 if e.errno == zmq.EAGAIN:
125 pass
125 pass
126 else:
126 else:
127 raise
127 raise
128 else:
128 else:
129 break
129 break
130 return reply[u'content'][u'data']
130 return reply[u'content'][u'data']
131
131
132
132
133 class Kernel(object):
133 class Kernel(object):
134
134
135 def __init__(self, session, reply_socket, pub_socket):
135 def __init__(self, session, reply_socket, pub_socket):
136 self.session = session
136 self.session = session
137 self.reply_socket = reply_socket
137 self.reply_socket = reply_socket
138 self.pub_socket = pub_socket
138 self.pub_socket = pub_socket
139 self.user_ns = {}
139 self.user_ns = {}
140 self.history = []
140 self.history = []
141 self.compiler = CommandCompiler()
141 self.compiler = CommandCompiler()
142 self.completer = KernelCompleter(self.user_ns)
142 self.completer = KernelCompleter(self.user_ns)
143
143
144 # Build dict of handlers for message types
144 # Build dict of handlers for message types
145 self.handlers = {}
145 self.handlers = {}
146 for msg_type in ['execute_request', 'complete_request']:
146 for msg_type in ['execute_request', 'complete_request']:
147 self.handlers[msg_type] = getattr(self, msg_type)
147 self.handlers[msg_type] = getattr(self, msg_type)
148
148
149 def abort_queue(self):
149 def abort_queue(self):
150 while True:
150 while True:
151 try:
151 try:
152 ident = self.reply_socket.recv(zmq.NOBLOCK)
152 ident = self.reply_socket.recv(zmq.NOBLOCK)
153 except zmq.ZMQError, e:
153 except zmq.ZMQError, e:
154 if e.errno == zmq.EAGAIN:
154 if e.errno == zmq.EAGAIN:
155 break
155 break
156 else:
156 else:
157 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
157 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
158 msg = self.reply_socket.recv_json()
158 msg = self.reply_socket.recv_json()
159 print>>sys.__stdout__, "Aborting:"
159 print>>sys.__stdout__, "Aborting:"
160 print>>sys.__stdout__, Message(msg)
160 print>>sys.__stdout__, Message(msg)
161 msg_type = msg['msg_type']
161 msg_type = msg['msg_type']
162 reply_type = msg_type.split('_')[0] + '_reply'
162 reply_type = msg_type.split('_')[0] + '_reply'
163 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
163 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
164 print>>sys.__stdout__, Message(reply_msg)
164 print>>sys.__stdout__, Message(reply_msg)
165 self.reply_socket.send(ident,zmq.SNDMORE)
165 self.reply_socket.send(ident,zmq.SNDMORE)
166 self.reply_socket.send_json(reply_msg)
166 self.reply_socket.send_json(reply_msg)
167 # We need to wait a bit for requests to come in. This can probably
167 # We need to wait a bit for requests to come in. This can probably
168 # be set shorter for true asynchronous clients.
168 # be set shorter for true asynchronous clients.
169 time.sleep(0.1)
169 time.sleep(0.1)
170
170
171 def execute_request(self, ident, parent):
171 def execute_request(self, ident, parent):
172 try:
172 try:
173 code = parent[u'content'][u'code']
173 code = parent[u'content'][u'code']
174 except:
174 except:
175 print>>sys.__stderr__, "Got bad msg: "
175 print>>sys.__stderr__, "Got bad msg: "
176 print>>sys.__stderr__, Message(parent)
176 print>>sys.__stderr__, Message(parent)
177 return
177 return
178 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
178 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
179 self.pub_socket.send_json(pyin_msg)
179 self.pub_socket.send_json(pyin_msg)
180 try:
180 try:
181 comp_code = self.compiler(code, '<zmq-kernel>')
181 comp_code = self.compiler(code, '<zmq-kernel>')
182 sys.displayhook.set_parent(parent)
182 sys.displayhook.set_parent(parent)
183 exec comp_code in self.user_ns, self.user_ns
183 exec comp_code in self.user_ns, self.user_ns
184 except:
184 except:
185 result = u'error'
185 result = u'error'
186 etype, evalue, tb = sys.exc_info()
186 etype, evalue, tb = sys.exc_info()
187 tb = traceback.format_exception(etype, evalue, tb)
187 tb = traceback.format_exception(etype, evalue, tb)
188 exc_content = {
188 exc_content = {
189 u'status' : u'error',
189 u'status' : u'error',
190 u'traceback' : tb,
190 u'traceback' : tb,
191 u'etype' : unicode(etype),
191 u'etype' : unicode(etype),
192 u'evalue' : unicode(evalue)
192 u'evalue' : unicode(evalue)
193 }
193 }
194 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
194 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
195 self.pub_socket.send_json(exc_msg)
195 self.pub_socket.send_json(exc_msg)
196 reply_content = exc_content
196 reply_content = exc_content
197 else:
197 else:
198 reply_content = {'status' : 'ok'}
198 reply_content = {'status' : 'ok'}
199 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
199 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
200 print>>sys.__stdout__, Message(reply_msg)
200 print>>sys.__stdout__, Message(reply_msg)
201 self.reply_socket.send(ident, zmq.SNDMORE)
201 self.reply_socket.send(ident, zmq.SNDMORE)
202 self.reply_socket.send_json(reply_msg)
202 self.reply_socket.send_json(reply_msg)
203 if reply_msg['content']['status'] == u'error':
203 if reply_msg['content']['status'] == u'error':
204 self.abort_queue()
204 self.abort_queue()
205
205
206 def complete_request(self, ident, parent):
206 def complete_request(self, ident, parent):
207 matches = {'matches' : self.complete(parent),
207 matches = {'matches' : self.complete(parent),
208 'status' : 'ok'}
208 'status' : 'ok'}
209 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
209 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
210 matches, parent, ident)
210 matches, parent, ident)
211 print >> sys.__stdout__, completion_msg
211 print >> sys.__stdout__, completion_msg
212
212
213 def complete(self, msg):
213 def complete(self, msg):
214 return self.completer.complete(msg.content.line, msg.content.text)
214 return self.completer.complete(msg.content.line, msg.content.text)
215
215
216 def start(self):
216 def start(self):
217 while True:
217 while True:
218 ident = self.reply_socket.recv()
218 ident = self.reply_socket.recv()
219 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
219 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
220 msg = self.reply_socket.recv_json()
220 msg = self.reply_socket.recv_json()
221 omsg = Message(msg)
221 omsg = Message(msg)
222 print>>sys.__stdout__
222 print>>sys.__stdout__, omsg
223 print>>sys.__stdout__, omsg
223 handler = self.handlers.get(omsg.msg_type, None)
224 handler = self.handlers.get(omsg.msg_type, None)
224 if handler is None:
225 if handler is None:
225 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
226 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
226 else:
227 else:
227 handler(ident, omsg)
228 handler(ident, omsg)
228
229
229
230
230 def main():
231 def main():
231 c = zmq.Context()
232 c = zmq.Context()
232
233
233 ip = '127.0.0.1'
234 ip = '127.0.0.1'
234 port_base = 5575
235 port_base = 5575
235 connection = ('tcp://%s' % ip) + ':%i'
236 connection = ('tcp://%s' % ip) + ':%i'
236 rep_conn = connection % port_base
237 rep_conn = connection % port_base
237 pub_conn = connection % (port_base+1)
238 pub_conn = connection % (port_base+1)
238
239
239 print >>sys.__stdout__, "Starting the kernel..."
240 print >>sys.__stdout__, "Starting the kernel..."
240 print >>sys.__stdout__, "On:",rep_conn, pub_conn
241 print >>sys.__stdout__, "XREP Channel:", rep_conn
242 print >>sys.__stdout__, "PUB Channel:", pub_conn
241
243
242 session = Session(username=u'kernel')
244 session = Session(username=u'kernel')
243
245
244 reply_socket = c.socket(zmq.XREP)
246 reply_socket = c.socket(zmq.XREP)
245 reply_socket.bind(rep_conn)
247 reply_socket.bind(rep_conn)
246
248
247 pub_socket = c.socket(zmq.PUB)
249 pub_socket = c.socket(zmq.PUB)
248 pub_socket.bind(pub_conn)
250 pub_socket.bind(pub_conn)
249
251
250 stdout = OutStream(session, pub_socket, u'stdout')
252 stdout = OutStream(session, pub_socket, u'stdout')
251 stderr = OutStream(session, pub_socket, u'stderr')
253 stderr = OutStream(session, pub_socket, u'stderr')
252 sys.stdout = stdout
254 sys.stdout = stdout
253 sys.stderr = stderr
255 sys.stderr = stderr
254
256
255 display_hook = DisplayHook(session, pub_socket)
257 display_hook = DisplayHook(session, pub_socket)
256 sys.displayhook = display_hook
258 sys.displayhook = display_hook
257
259
258 kernel = Kernel(session, reply_socket, pub_socket)
260 kernel = Kernel(session, reply_socket, pub_socket)
259
261
260 # For debugging convenience, put sleep and a string in the namespace, so we
262 # For debugging convenience, put sleep and a string in the namespace, so we
261 # have them every time we start.
263 # have them every time we start.
262 kernel.user_ns['sleep'] = time.sleep
264 kernel.user_ns['sleep'] = time.sleep
263 kernel.user_ns['s'] = 'Test string'
265 kernel.user_ns['s'] = 'Test string'
264
266
265 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
267 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
266 kernel.start()
268 kernel.start()
267
269
268
270
269 if __name__ == '__main__':
271 if __name__ == '__main__':
270 main()
272 main()
@@ -1,119 +1,122 b''
1 import os
1 import os
2 import uuid
2 import uuid
3 import pprint
3 import pprint
4
4
5 import zmq
5 import zmq
6
6
7 class Message(object):
7 class Message(object):
8 """A simple message object that maps dict keys to attributes.
8 """A simple message object that maps dict keys to attributes.
9
9
10 A Message can be created from a dict and a dict from a Message instance
10 A Message can be created from a dict and a dict from a Message instance
11 simply by calling dict(msg_obj)."""
11 simply by calling dict(msg_obj)."""
12
12
13 def __init__(self, msg_dict):
13 def __init__(self, msg_dict):
14 dct = self.__dict__
14 dct = self.__dict__
15 for k, v in msg_dict.iteritems():
15 for k, v in msg_dict.iteritems():
16 if isinstance(v, dict):
16 if isinstance(v, dict):
17 v = Message(v)
17 v = Message(v)
18 dct[k] = v
18 dct[k] = v
19
19
20 # Having this iterator lets dict(msg_obj) work out of the box.
20 # Having this iterator lets dict(msg_obj) work out of the box.
21 def __iter__(self):
21 def __iter__(self):
22 return iter(self.__dict__.iteritems())
22 return iter(self.__dict__.iteritems())
23
23
24 def __repr__(self):
24 def __repr__(self):
25 return repr(self.__dict__)
25 return repr(self.__dict__)
26
26
27 def __str__(self):
27 def __str__(self):
28 return pprint.pformat(self.__dict__)
28 return pprint.pformat(self.__dict__)
29
29
30 def __contains__(self, k):
30 def __contains__(self, k):
31 return k in self.__dict__
31 return k in self.__dict__
32
32
33 def __getitem__(self, k):
33 def __getitem__(self, k):
34 return self.__dict__[k]
34 return self.__dict__[k]
35
35
36
36
37 def msg_header(msg_id, username, session):
37 def msg_header(msg_id, username, session):
38 return {
38 return {
39 'msg_id' : msg_id,
39 'msg_id' : msg_id,
40 'username' : username,
40 'username' : username,
41 'session' : session
41 'session' : session
42 }
42 }
43
43
44
44
45 def extract_header(msg_or_header):
45 def extract_header(msg_or_header):
46 """Given a message or header, return the header."""
46 """Given a message or header, return the header."""
47 if not msg_or_header:
47 if not msg_or_header:
48 return {}
48 return {}
49 try:
49 try:
50 # See if msg_or_header is the entire message.
50 # See if msg_or_header is the entire message.
51 h = msg_or_header['header']
51 h = msg_or_header['header']
52 except KeyError:
52 except KeyError:
53 try:
53 try:
54 # See if msg_or_header is just the header
54 # See if msg_or_header is just the header
55 h = msg_or_header['msg_id']
55 h = msg_or_header['msg_id']
56 except KeyError:
56 except KeyError:
57 raise
57 raise
58 else:
58 else:
59 h = msg_or_header
59 h = msg_or_header
60 if not isinstance(h, dict):
60 if not isinstance(h, dict):
61 h = dict(h)
61 h = dict(h)
62 return h
62 return h
63
63
64
64
65 class Session(object):
65 class Session(object):
66
66
67 def __init__(self, username=os.environ.get('USER','username')):
67 def __init__(self, username=os.environ.get('USER','username'), session=None):
68 self.username = username
68 self.username = username
69 self.session = str(uuid.uuid4())
69 if session is None:
70 self.session = str(uuid.uuid4())
71 else:
72 self.session = session
70 self.msg_id = 0
73 self.msg_id = 0
71
74
72 def msg_header(self):
75 def msg_header(self):
73 h = msg_header(self.msg_id, self.username, self.session)
76 h = msg_header(self.msg_id, self.username, self.session)
74 self.msg_id += 1
77 self.msg_id += 1
75 return h
78 return h
76
79
77 def msg(self, msg_type, content=None, parent=None):
80 def msg(self, msg_type, content=None, parent=None):
78 msg = {}
81 msg = {}
79 msg['header'] = self.msg_header()
82 msg['header'] = self.msg_header()
80 msg['parent_header'] = {} if parent is None else extract_header(parent)
83 msg['parent_header'] = {} if parent is None else extract_header(parent)
81 msg['msg_type'] = msg_type
84 msg['msg_type'] = msg_type
82 msg['content'] = {} if content is None else content
85 msg['content'] = {} if content is None else content
83 return msg
86 return msg
84
87
85 def send(self, socket, msg_type, content=None, parent=None, ident=None):
88 def send(self, socket, msg_type, content=None, parent=None, ident=None):
86 msg = self.msg(msg_type, content, parent)
89 msg = self.msg(msg_type, content, parent)
87 if ident is not None:
90 if ident is not None:
88 socket.send(ident, zmq.SNDMORE)
91 socket.send(ident, zmq.SNDMORE)
89 socket.send_json(msg)
92 socket.send_json(msg)
90 omsg = Message(msg)
93 omsg = Message(msg)
91 return omsg
94 return omsg
92
95
93 def recv(self, socket, mode=zmq.NOBLOCK):
96 def recv(self, socket, mode=zmq.NOBLOCK):
94 try:
97 try:
95 msg = socket.recv_json(mode)
98 msg = socket.recv_json(mode)
96 except zmq.ZMQError, e:
99 except zmq.ZMQError, e:
97 if e.errno == zmq.EAGAIN:
100 if e.errno == zmq.EAGAIN:
98 # We can convert EAGAIN to None as we know in this case
101 # We can convert EAGAIN to None as we know in this case
99 # recv_json won't return None.
102 # recv_json won't return None.
100 return None
103 return None
101 else:
104 else:
102 raise
105 raise
103 return Message(msg)
106 return Message(msg)
104
107
105 def test_msg2obj():
108 def test_msg2obj():
106 am = dict(x=1)
109 am = dict(x=1)
107 ao = Message(am)
110 ao = Message(am)
108 assert ao.x == am['x']
111 assert ao.x == am['x']
109
112
110 am['y'] = dict(z=1)
113 am['y'] = dict(z=1)
111 ao = Message(am)
114 ao = Message(am)
112 assert ao.y.z == am['y']['z']
115 assert ao.y.z == am['y']['z']
113
116
114 k1, k2 = 'y', 'z'
117 k1, k2 = 'y', 'z'
115 assert ao[k1][k2] == am[k1][k2]
118 assert ao[k1][k2] == am[k1][k2]
116
119
117 am2 = dict(ao)
120 am2 = dict(ao)
118 assert am['x'] == am2['x']
121 assert am['x'] == am2['x']
119 assert am['y']['z'] == am2['y']['z']
122 assert am['y']['z'] == am2['y']['z']
General Comments 0
You need to be logged in to leave comments. Login now