##// END OF EJS Templates
Added files from our zmq prototype into main ipython tree....
Fernando Perez -
Show More
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
@@ -0,0 +1,86 b''
1 """Tab-completion over zmq"""
2
3 # Trying to get print statements to work during completion, not very
4 # successfully...
5 from __future__ import print_function
6
7 import itertools
8 import readline
9 import rlcompleter
10 import time
11
12 import session
13
14 class KernelCompleter(object):
15 """Kernel-side completion machinery."""
16 def __init__(self, namespace):
17 self.namespace = namespace
18 self.completer = rlcompleter.Completer(namespace)
19
20 def complete(self, line, text):
21 # We'll likely use linel later even if now it's not used for anything
22 matches = []
23 complete = self.completer.complete
24 for state in itertools.count():
25 comp = complete(text, state)
26 if comp is None:
27 break
28 matches.append(comp)
29 return matches
30
31
32 class ClientCompleter(object):
33 """Client-side completion machinery.
34
35 How it works: self.complete will be called multiple times, with
36 state=0,1,2,... When state=0 it should compute ALL the completion matches,
37 and then return them for each value of state."""
38
39 def __init__(self, client, session, socket):
40 # ugly, but we get called asynchronously and need access to some
41 # client state, like backgrounded code
42 self.client = client
43 self.session = session
44 self.socket = socket
45 self.matches = []
46
47 def request_completion(self, text):
48 # Get full line to give to the kernel in case it wants more info.
49 line = readline.get_line_buffer()
50 # send completion request to kernel
51 msg = self.session.send(self.socket,
52 'complete_request',
53 dict(text=text, line=line))
54
55 # Give the kernel up to 0.5s to respond
56 for i in range(5):
57 rep = self.session.recv(self.socket)
58 if rep is not None and rep.msg_type == 'complete_reply':
59 matches = rep.content.matches
60 break
61 time.sleep(0.1)
62 else:
63 # timeout
64 print ('TIMEOUT') # Can't see this message...
65 matches = None
66 return matches
67
68 def complete(self, text, state):
69
70 if self.client.backgrounded > 0:
71 print("\n[Not completing, background tasks active]")
72 print(readline.get_line_buffer(), end='')
73 return None
74
75 if state==0:
76 matches = self.request_completion(text)
77 if matches is None:
78 self.matches = []
79 print('WARNING: Kernel timeout on tab completion.')
80 else:
81 self.matches = matches
82
83 try:
84 return self.matches[state]
85 except IndexError:
86 return None
@@ -0,0 +1,194 b''
1 #!/usr/bin/env python
2 """A simple interactive frontend that talks to a kernel over 0MQ.
3 """
4
5 #-----------------------------------------------------------------------------
6 # Imports
7 #-----------------------------------------------------------------------------
8 # stdlib
9 import cPickle as pickle
10 import code
11 import readline
12 import sys
13 import time
14 import uuid
15
16 # our own
17 import zmq
18 import session
19 import completer
20
21 #-----------------------------------------------------------------------------
22 # Classes and functions
23 #-----------------------------------------------------------------------------
24
25 class Console(code.InteractiveConsole):
26
27 def __init__(self, locals=None, filename="<console>",
28 session = session,
29 request_socket=None,
30 sub_socket=None):
31 code.InteractiveConsole.__init__(self, locals, filename)
32 self.session = session
33 self.request_socket = request_socket
34 self.sub_socket = sub_socket
35 self.backgrounded = 0
36 self.messages = {}
37
38 # Set tab completion
39 self.completer = completer.ClientCompleter(self, session, request_socket)
40 readline.parse_and_bind('tab: complete')
41 readline.parse_and_bind('set show-all-if-ambiguous on')
42 readline.set_completer(self.completer.complete)
43
44 # Set system prompts
45 sys.ps1 = 'Py>>> '
46 sys.ps2 = ' ... '
47 sys.ps3 = 'Out : '
48 # Build dict of handlers for message types
49 self.handlers = {}
50 for msg_type in ['pyin', 'pyout', 'pyerr', 'stream']:
51 self.handlers[msg_type] = getattr(self, 'handle_%s' % msg_type)
52
53 def handle_pyin(self, omsg):
54 if omsg.parent_header.session == self.session.session:
55 return
56 c = omsg.content.code.rstrip()
57 if c:
58 print '[IN from %s]' % omsg.parent_header.username
59 print c
60
61 def handle_pyout(self, omsg):
62 #print omsg # dbg
63 if omsg.parent_header.session == self.session.session:
64 print "%s%s" % (sys.ps3, omsg.content.data)
65 else:
66 print '[Out from %s]' % omsg.parent_header.username
67 print omsg.content.data
68
69 def print_pyerr(self, err):
70 print >> sys.stderr, err.etype,':', err.evalue
71 print >> sys.stderr, ''.join(err.traceback)
72
73 def handle_pyerr(self, omsg):
74 if omsg.parent_header.session == self.session.session:
75 return
76 print >> sys.stderr, '[ERR from %s]' % omsg.parent_header.username
77 self.print_pyerr(omsg.content)
78
79 def handle_stream(self, omsg):
80 if omsg.content.name == 'stdout':
81 outstream = sys.stdout
82 else:
83 outstream = sys.stderr
84 print >> outstream, '*ERR*',
85 print >> outstream, omsg.content.data,
86
87 def handle_output(self, omsg):
88 handler = self.handlers.get(omsg.msg_type, None)
89 if handler is not None:
90 handler(omsg)
91
92 def recv_output(self):
93 while True:
94 omsg = self.session.recv(self.sub_socket)
95 if omsg is None:
96 break
97 self.handle_output(omsg)
98
99 def handle_reply(self, rep):
100 # Handle any side effects on output channels
101 self.recv_output()
102 # Now, dispatch on the possible reply types we must handle
103 if rep is None:
104 return
105 if rep.content.status == 'error':
106 self.print_pyerr(rep.content)
107 elif rep.content.status == 'aborted':
108 print >> sys.stderr, "ERROR: ABORTED"
109 ab = self.messages[rep.parent_header.msg_id].content
110 if 'code' in ab:
111 print >> sys.stderr, ab.code
112 else:
113 print >> sys.stderr, ab
114
115 def recv_reply(self):
116 rep = self.session.recv(self.request_socket)
117 self.handle_reply(rep)
118 return rep
119
120 def runcode(self, code):
121 # We can't pickle code objects, so fetch the actual source
122 src = '\n'.join(self.buffer)
123
124 # for non-background inputs, if we do have previoiusly backgrounded
125 # jobs, check to see if they've produced results
126 if not src.endswith(';'):
127 while self.backgrounded > 0:
128 #print 'checking background'
129 rep = self.recv_reply()
130 if rep:
131 self.backgrounded -= 1
132 time.sleep(0.05)
133
134 # Send code execution message to kernel
135 omsg = self.session.send(self.request_socket,
136 'execute_request', dict(code=src))
137 self.messages[omsg.header.msg_id] = omsg
138
139 # Fake asynchronicity by letting the user put ';' at the end of the line
140 if src.endswith(';'):
141 self.backgrounded += 1
142 return
143
144 # For foreground jobs, wait for reply
145 while True:
146 rep = self.recv_reply()
147 if rep is not None:
148 break
149 self.recv_output()
150 time.sleep(0.05)
151 else:
152 # We exited without hearing back from the kernel!
153 print >> sys.stderr, 'ERROR!!! kernel never got back to us!!!'
154
155
156 class InteractiveClient(object):
157 def __init__(self, session, request_socket, sub_socket):
158 self.session = session
159 self.request_socket = request_socket
160 self.sub_socket = sub_socket
161 self.console = Console(None, '<zmq-console>',
162 session, request_socket, sub_socket)
163
164 def interact(self):
165 self.console.interact()
166
167
168 def main():
169 # Defaults
170 #ip = '192.168.2.109'
171 ip = '127.0.0.1'
172 #ip = '99.146.222.252'
173 port_base = 5575
174 connection = ('tcp://%s' % ip) + ':%i'
175 req_conn = connection % port_base
176 sub_conn = connection % (port_base+1)
177
178 # Create initial sockets
179 c = zmq.Context()
180 request_socket = c.socket(zmq.XREQ)
181 request_socket.connect(req_conn)
182
183 sub_socket = c.socket(zmq.SUB)
184 sub_socket.connect(sub_conn)
185 sub_socket.setsockopt(zmq.SUBSCRIBE, '')
186
187 # Make session and user-facing client
188 sess = session.Session()
189 client = InteractiveClient(sess, request_socket, sub_socket)
190 client.interact()
191
192
193 if __name__ == '__main__':
194 main()
@@ -0,0 +1,119 b''
1 import os
2 import uuid
3 import pprint
4
5 import zmq
6
7 class Message(object):
8 """A simple message object that maps dict keys to attributes.
9
10 A Message can be created from a dict and a dict from a Message instance
11 simply by calling dict(msg_obj)."""
12
13 def __init__(self, msg_dict):
14 dct = self.__dict__
15 for k, v in msg_dict.iteritems():
16 if isinstance(v, dict):
17 v = Message(v)
18 dct[k] = v
19
20 # Having this iterator lets dict(msg_obj) work out of the box.
21 def __iter__(self):
22 return iter(self.__dict__.iteritems())
23
24 def __repr__(self):
25 return repr(self.__dict__)
26
27 def __str__(self):
28 return pprint.pformat(self.__dict__)
29
30 def __contains__(self, k):
31 return k in self.__dict__
32
33 def __getitem__(self, k):
34 return self.__dict__[k]
35
36
37 def msg_header(msg_id, username, session):
38 return {
39 'msg_id' : msg_id,
40 'username' : username,
41 'session' : session
42 }
43
44
45 def extract_header(msg_or_header):
46 """Given a message or header, return the header."""
47 if not msg_or_header:
48 return {}
49 try:
50 # See if msg_or_header is the entire message.
51 h = msg_or_header['header']
52 except KeyError:
53 try:
54 # See if msg_or_header is just the header
55 h = msg_or_header['msg_id']
56 except KeyError:
57 raise
58 else:
59 h = msg_or_header
60 if not isinstance(h, dict):
61 h = dict(h)
62 return h
63
64
65 class Session(object):
66
67 def __init__(self, username=os.environ.get('USER','username')):
68 self.username = username
69 self.session = str(uuid.uuid4())
70 self.msg_id = 0
71
72 def msg_header(self):
73 h = msg_header(self.msg_id, self.username, self.session)
74 self.msg_id += 1
75 return h
76
77 def msg(self, msg_type, content=None, parent=None):
78 msg = {}
79 msg['header'] = self.msg_header()
80 msg['parent_header'] = {} if parent is None else extract_header(parent)
81 msg['msg_type'] = msg_type
82 msg['content'] = {} if content is None else content
83 return msg
84
85 def send(self, socket, msg_type, content=None, parent=None, ident=None):
86 msg = self.msg(msg_type, content, parent)
87 if ident is not None:
88 socket.send(ident, zmq.SNDMORE)
89 socket.send_json(msg)
90 omsg = Message(msg)
91 return omsg
92
93 def recv(self, socket, mode=zmq.NOBLOCK):
94 try:
95 msg = socket.recv_json(mode)
96 except zmq.ZMQError, e:
97 if e.errno == zmq.EAGAIN:
98 # We can convert EAGAIN to None as we know in this case
99 # recv_json won't return None.
100 return None
101 else:
102 raise
103 return Message(msg)
104
105 def test_msg2obj():
106 am = dict(x=1)
107 ao = Message(am)
108 assert ao.x == am['x']
109
110 am['y'] = dict(z=1)
111 ao = Message(am)
112 assert ao.y.z == am['y']['z']
113
114 k1, k2 = 'y', 'z'
115 assert ao[k1][k2] == am[k1][k2]
116
117 am2 = dict(ao)
118 assert am['x'] == am2['x']
119 assert am['y']['z'] == am2['y']['z']
@@ -0,0 +1,97 b''
1 =====================
2 Message Specification
3 =====================
4
5 Note: not all of these have yet been fully fleshed out, but the key ones are,
6 see kernel and frontend files for actual implementation details.
7
8 General Message Format
9 =====================
10
11 General message format::
12
13 {
14 header : { 'msg_id' : 10, # start with 0
15 'username' : 'name',
16 'session' : uuid
17 },
18 parent_header : dict,
19 msg_type : 'string_message_type',
20 content : blackbox_dict , # Must be a dict
21 }
22
23 Side effect: (PUB/SUB)
24 ======================
25
26 # msg_type = 'stream'
27 content = {
28 name : 'stdout',
29 data : 'blob',
30 }
31
32 # msg_type = 'pyin'
33 content = {
34 code = 'x=1',
35 }
36
37 # msg_type = 'pyout'
38 content = {
39 data = 'repr(obj)',
40 prompt_number = 10
41 }
42
43 # msg_type = 'pyerr'
44 content = {
45 traceback : 'full traceback',
46 exc_type : 'TypeError',
47 exc_value : 'msg'
48 }
49
50 # msg_type = 'file'
51 content = {
52 path = 'cool.jpg',
53 data : 'blob'
54 }
55
56 Request/Reply
57 =============
58
59 Execute
60 -------
61
62 Request:
63
64 # msg_type = 'execute_request'
65 content = {
66 code : 'a = 10',
67 }
68
69 Reply:
70
71 # msg_type = 'execute_reply'
72 content = {
73 'status' : 'ok' OR 'error' OR 'abort'
74 # data depends on status value
75 }
76
77 Complete
78 --------
79
80 # msg_type = 'complete_request'
81 content = {
82 text : 'a.f', # complete on this
83 line : 'print a.f' # full line
84 }
85
86 # msg_type = 'complete_reply'
87 content = {
88 matches : ['a.foo', 'a.bar']
89 }
90
91 Control
92 -------
93
94 # msg_type = 'heartbeat'
95 content = {
96
97 }
@@ -14,6 +14,7 b" IPython developer's guide"
14 release.txt
14 release.txt
15 roadmap.txt
15 roadmap.txt
16 reorg.txt
16 reorg.txt
17 messaging.txt
17 magic_blueprint.txt
18 magic_blueprint.txt
18 notification_blueprint.txt
19 notification_blueprint.txt
19 ipgraph.txt
20 ipgraph.txt
General Comments 0
You need to be logged in to leave comments. Login now