##// END OF EJS Templates
Refactor and simplification of zmqterminal.
Thomas Kluyver -
Show More
@@ -1,51 +1,42 b''
1 1 # -*- coding: utf-8 -*-
2 2 import readline
3 import time
4 import sys
5 stdout = sys.stdout
6
7 class TimeoutError(Exception):
8 pass
3 from Queue import Empty
9 4
10 5 class ClientCompleter2p(object):
11 6 """Client-side completion machinery.
12 7
13 8 How it works: self.complete will be called multiple times, with
14 9 state=0,1,2,... When state=0 it should compute ALL the completion matches,
15 10 and then return them for each value of state."""
16 11
17 12 def __init__(self,client, km):
18 13 self.km = km
19 14 self.matches = []
20 15 self.client = client
21 16
22 17 def complete_request(self,text):
23 18 line = readline.get_line_buffer()
24 19 #msg_id = self.km.xreq_channel.complete(text=text,line=line)#this method is not working, the code not continue
25 20 msg = self.km.session.send(self.km.xreq_channel.socket,
26 21 'complete_request',
27 22 dict(text=text, line=line))
28 23 # send completion request to kernel
29 24 # Give the kernel up to 0.5s to respond
30 for i in range(5):
31 if self.km.xreq_channel.was_called():
32 msg_xreq = self.km.xreq_channel.get_msg()
33 if msg["header"]['session'] == msg_xreq["parent_header"]['session'] and \
34 msg_xreq["content"]["status"] == 'ok' and \
35 msg_xreq["msg_type"] == "complete_reply" :
36 return msg_xreq["content"]["matches"]
37
38 time.sleep(0.1)
39 raise TimeoutError
25 msg_xreq = self.km.xreq_channel.get_msg(timeout=0.5)
26 if msg["header"]['session'] == msg_xreq["parent_header"]['session'] and \
27 msg_xreq["content"]["status"] == 'ok' and \
28 msg_xreq["msg_type"] == "complete_reply" :
29 return msg_xreq["content"]["matches"]
30 return []
40 31
41 32 def complete(self, text, state):
42 33 if state == 0:
43 34 try:
44 35 self.matches = self.complete_request(text)
45 except TimeoutError:
36 except Empty:
46 37 print('WARNING: Kernel timeout on tab completion.')
47 38
48 39 try:
49 40 return self.matches[state]
50 41 except IndexError:
51 42 return None
@@ -1,280 +1,266 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Frontend of ipython working with python-zmq
3 3
4 4 Ipython's frontend, is a ipython interface that send request to kernel and proccess the kernel's outputs.
5 5
6 6 For more details, see the ipython-zmq design
7 7 """
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2010 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import __builtin__
20 from contextlib import nested
21 import time
22 20 import sys
23 21 import os
24 import signal
25 import uuid
26 import cPickle as pickle
27 import code
28 import zmq
22 from Queue import Empty
29 23 import readline
30 24 import rlcompleter
31 import time
32 25
33 26 #-----------------------------------------------------------------------------
34 27 # Imports from ipython
35 28 #-----------------------------------------------------------------------------
36 29 from IPython.external.argparse import ArgumentParser
37 from IPython.utils.traitlets import (
38 Int, Str, CBool, CaselessStrEnum, Enum, List, Unicode
39 )
40 from IPython.core.interactiveshell import get_default_colors
41 from IPython.core.excolors import exception_colors
42 from IPython.utils import PyColorize
43 30 from IPython.core.inputsplitter import IPythonInputSplitter
44 from IPython.frontend.zmqterminal.kernelmanager import KernelManager2p as KernelManager
45 from IPython.zmq.session import Session
31 from IPython.zmq.blockingkernelmanager import BlockingKernelManager as KernelManager
46 32 from IPython.frontend.zmqterminal.completer import ClientCompleter2p
47 33
48 34 #-----------------------------------------------------------------------------
49 35 # Network Constants
50 36 #-----------------------------------------------------------------------------
51 37
52 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
53 39 class Frontend(object):
54 """This class is a simple frontend to ipython-zmq
40 """This class is a simple frontend to ipython-zmq
55 41
56 42 NOTE: this class uses kernelmanager to manipulate sockets
57 43
58 44 Parameters:
59 45 -----------
60 46 kernelmanager : object
61 47 instantiated object from class KernelManager in module kernelmanager
62 48
63 """
49 """
64 50
65 def __init__(self, kernelmanager):
51 def __init__(self, kernelmanager):
66 52 self.km = kernelmanager
67 53 self.session = kernelmanager.session
68 54 self.request_socket = self.km.xreq_channel.socket
69 55 self.sub_socket = self.km.sub_channel.socket
70 56 self.reply_socket = self.km.rep_channel.socket
71 57 self.msg_header = self.km.session.msg_header()
72 self.completer = ClientCompleter2p(self,self.km)
58 self.completer = ClientCompleter2p(self, self.km)
73 59 readline.parse_and_bind("tab: complete")
74 60 readline.parse_and_bind('set show-all-if-ambiguous on')
75 61 readline.set_completer(self.completer.complete)
76 62
77 63 history_path = os.path.expanduser('~/.ipython/history')
78 64 if os.path.isfile(history_path):
79 65 rlcompleter.readline.read_history_file(history_path)
80 66 else:
81 67 print("history file cannot be read.")
82 68
83 69 self.messages = {}
84 70
85 71 self._splitter = IPythonInputSplitter()
86 72 self.code = ""
87 73
88 74 self.prompt_count = 0
89 75 self._get_initial_prompt()
90 76
91 def _get_initial_prompt(self):
77 def _get_initial_prompt(self):
92 78 self._execute('', hidden=True)
93 79
94 def interact(self):
80 def interact(self):
95 81 """Gets input from console using inputsplitter, then
96 82 while you enter code it can indent and set index id to any input
97 83 """
98 84
99 85 try:
100 86 self._splitter.push(raw_input('In[%i]:'%self.prompt_count+self.code))
101 87 while self._splitter.push_accepts_more():
102 88 self.code = raw_input('.....:'+' '*self._splitter.indent_spaces)
103 89 self._splitter.push(' '*self._splitter.indent_spaces+self.code)
104 90 self._execute(self._splitter.source,False)
105 91 self._splitter.reset()
106 92 except KeyboardInterrupt:
107 93 print('\nKeyboardInterrupt\n')
108 94 pass
109 95
110 96
111 def start(self):
97 def start(self):
112 98 """Start the interaction loop, calling the .interact() method for each
113 99 input cell.
114 100 """
115 101 while True:
116 102 try:
117 103 self.interact()
118 104 except KeyboardInterrupt:
119 105 print('\nKeyboardInterrupt\n')
120 106 pass
121 107 except EOFError:
122 108 answer = ''
123 109 while True:
124 110 answer = raw_input('\nDo you really want to exit ([y]/n)?')
125 111 if answer == 'y' or answer == '' :
126 112 self.km.shutdown_kernel()
127 113 sys.exit()
128 114 elif answer == 'n':
129 115 break
130 116
131 def _execute(self, source, hidden = True):
132 """ Execute 'source'. If 'hidden', do not show any output.
117 def _execute(self, source, hidden = True):
118 """ Execute 'source'. If 'hidden', do not show any output.
133 119
134 120 See parent class :meth:`execute` docstring for full details.
135 """
136 self.km.xreq_channel.execute(source, hidden)
137 self.handle_xreq_channel()
138 self.handle_rep_channel()
121 """
122 self.km.xreq_channel.execute(source, hidden)
123 while not self.km.xreq_channel.msg_ready():
124 try:
125 self.handle_rep_channel(timeout=0.1)
126 except Empty:
127 pass
128 self.handle_xreq_channel()
139 129
140 def handle_xreq_channel(self):
141 # Give the kernel up to 0.5s to respond
142 for i in range(5):
143 if self.km.xreq_channel.was_called():
144 self.msg_xreq = self.km.xreq_channel.get_msg()
145 if self.msg_header["session"] == self.msg_xreq["parent_header"]["session"] :
146 if self.msg_xreq["content"]["status"] == 'ok' :
147 if self.msg_xreq["msg_type"] == "execute_reply" :
148 self.handle_sub_channel()
149 self.prompt_count = self.msg_xreq["content"]["execution_count"]+1
150
151 else:
152 etb = self.msg_xreq["content"]["traceback"]
153 print >> sys.stderr, etb[0]
154 print >> sys.stderr, etb[1]
155 print >> sys.stderr, etb[2]
156 self.prompt_count = self.msg_xreq["content"]["execution_count"]+1
157 break
158 time.sleep(0.1)
130 def handle_xreq_channel(self):
131 self.msg_xreq = self.km.xreq_channel.get_msg()
132 if self.msg_header["session"] == self.msg_xreq["parent_header"]["session"]:
133 if self.msg_xreq["content"]["status"] == 'ok' :
134 if self.msg_xreq["msg_type"] == "execute_reply" :
135 self.handle_sub_channel()
136 self.prompt_count = self.msg_xreq["content"]["execution_count"]+1
137
138 else:
139 etb = self.msg_xreq["content"]["traceback"]
140 print >> sys.stderr, etb[0]
141 try: # These bits aren't there for a SyntaxError
142 print >> sys.stderr, etb[1]
143 print >> sys.stderr, etb[2]
144 except IndexError:
145 pass
146 self.prompt_count = self.msg_xreq["content"]["execution_count"]+1
159 147
160 148
161 def handle_sub_channel(self):
149 def handle_sub_channel(self):
162 150 """ Method to procces subscribe channel's messages
163 151
164 152 This method reads a message and processes the content in different
165 153 outputs like stdout, stderr, pyout and status
166 154
167 155 Arguments:
168 156 sub_msg: message receive from kernel in the sub socket channel
169 157 capture by kernel manager.
170 158 """
171 while self.km.sub_channel.was_called():
159 while self.km.sub_channel.msg_ready():
172 160 sub_msg = self.km.sub_channel.get_msg()
173 161 if self.msg_header["username"] == sub_msg['parent_header']['username'] and \
174 162 self.km.session.session == sub_msg['parent_header']['session']:
175 163 if sub_msg['msg_type'] == 'status' :
176 164 if sub_msg["content"]["execution_state"] == "busy" :
177 165 pass
178 166
179 167 if sub_msg['msg_type'] == 'stream' :
180 168 if sub_msg["content"]["name"] == "stdout":
181 169 print >> sys.stdout,sub_msg["content"]["data"]
182 170 sys.stdout.flush()
183 171 if sub_msg["content"]["name"] == "stderr" :
184 172 print >> sys.stderr,sub_msg["content"]["data"]
185 173 sys.stderr.flush()
186 174
187 175 if sub_msg['msg_type'] == 'pyout' :
188 176 print >> sys.stdout,"Out[%i]:"%sub_msg["content"]["execution_count"], sub_msg["content"]["data"]["text/plain"]
189 177 sys.stdout.flush()
190 178
191 def handle_rep_channel(self):
192 """ Method to capture raw_input
193 """
194 if self.km.rep_channel.was_called() :
195 self.msg_rep = self.km.rep_channel.get_msg()
196 if self.msg_header["session"] == self.msg_rep["parent_header"]["session"] :
197 raw_data = raw_input(self.msg_rep["content"]["prompt"])
198 self.km.rep_channel.input(raw_data)
179 def handle_rep_channel(self, timeout=0.1):
180 """ Method to capture raw_input
181 """
182 self.msg_rep = self.km.rep_channel.get_msg(timeout=timeout)
183 if self.msg_header["session"] == self.msg_rep["parent_header"]["session"] :
184 raw_data = raw_input(self.msg_rep["content"]["prompt"])
185 self.km.rep_channel.input(raw_data)
199 186
200 187
201 188
202 189
203 190 def start_frontend():
204 191 """ Entry point for application.
205 192
206 193 """
207 194 # Parse command line arguments.
208 195 parser = ArgumentParser()
209 196 kgroup = parser.add_argument_group('kernel options')
210 197 kgroup.add_argument('-e', '--existing', action='store_true',
211 198 help='connect to an existing kernel')
212 199 kgroup.add_argument('--ip', type=str, default=LOCALHOST,
213 200 help=\
214 201 "set the kernel\'s IP address [default localhost].\
215 202 If the IP address is something other than localhost, then \
216 203 Consoles on other machines will be able to connect\
217 204 to the Kernel, so be careful!")
218 205 kgroup.add_argument('--xreq', type=int, metavar='PORT', default=0,
219 206 help='set the XREQ channel port [default random]')
220 207 kgroup.add_argument('--sub', type=int, metavar='PORT', default=0,
221 208 help='set the SUB channel port [default random]')
222 209 kgroup.add_argument('--rep', type=int, metavar='PORT', default=0,
223 210 help='set the REP channel port [default random]')
224 211 kgroup.add_argument('--hb', type=int, metavar='PORT', default=0,
225 212 help='set the heartbeat port [default random]')
226 213
227 214 egroup = kgroup.add_mutually_exclusive_group()
228 215 egroup.add_argument('--pure', action='store_true', help = \
229 216 'use a pure Python kernel instead of an IPython kernel')
230 217 egroup.add_argument('--pylab', type=str, metavar='GUI', nargs='?',
231 218 const='auto', help = \
232 219 "Pre-load matplotlib and numpy for interactive use. If GUI is not \
233 220 given, the GUI backend is matplotlib's, otherwise use one of: \
234 221 ['tk', 'gtk', 'qt', 'wx', 'inline'].")
235 222 egroup.add_argument('--colors', type=str,
236 223 help="Set the color scheme (LightBG,Linux,NoColor). This is guessed\
237 224 based on the pygments style if not set.")
238 225
239 226 args = parser.parse_args()
240 227
241 228 # parse the colors arg down to current known labels
242 229 if args.colors:
243 230 colors=args.colors.lower()
244 231 if colors in ('lightbg', 'light'):
245 232 colors='lightbg'
246 233 elif colors in ('dark', 'linux'):
247 234 colors='linux'
248 235 else:
249 236 colors='nocolor'
250 237 else:
251 238 colors=None
252 239
253 240 # Create a KernelManager and start a kernel.
254 241 kernel_manager = KernelManager(xreq_address=(args.ip, args.xreq),
255 242 sub_address=(args.ip, args.sub),
256 243 rep_address=(args.ip, args.rep),
257 244 hb_address=(args.ip, args.hb))
258 245 if not args.existing:
259 246 # if not args.ip in LOCAL_IPS+ALL_ALIAS:
260 247 # raise ValueError("Must bind a local ip, such as: %s"%LOCAL_IPS)
261 248
262 249 kwargs = dict(ip=args.ip)
263 250 if args.pure:
264 251 kwargs['ipython']=False
265 252 else:
266 253 kwargs['colors']=colors
267 254 if args.pylab:
268 255 kwargs['pylab']=args.pylab
269 256 kernel_manager.start_kernel(**kwargs)
270 257
271 258
272 259 kernel_manager.start_channels()
273 time.sleep(4)
274 260
275 261 frontend=Frontend(kernel_manager)
276 262 return frontend
277 263
278 264 if __name__ == "__main__" :
279 265 frontend=start_frontend()
280 266 frontend.start()
@@ -1,129 +1,152 b''
1 1 """Implement a fully blocking kernel manager.
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2010-2011 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING.txt, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15 from __future__ import print_function
16 16
17 17 # Stdlib
18 18 from Queue import Queue, Empty
19 from threading import Event
19 20
20 21 # Our own
21 22 from IPython.utils import io
22 23 from IPython.utils.traitlets import Type
23 24
24 25 from .kernelmanager import (KernelManager, SubSocketChannel, HBSocketChannel,
25 26 ShellSocketChannel, StdInSocketChannel)
26 27
27 28 #-----------------------------------------------------------------------------
28 29 # Functions and classes
29 30 #-----------------------------------------------------------------------------
30 31
31 32 class BlockingSubSocketChannel(SubSocketChannel):
32 33
33 34 def __init__(self, context, session, address=None):
34 35 super(BlockingSubSocketChannel, self).__init__(context, session,
35 36 address)
36 37 self._in_queue = Queue()
37 38
38 39 def call_handlers(self, msg):
39 40 #io.rprint('[[Sub]]', msg) # dbg
40 41 self._in_queue.put(msg)
41 42
42 43 def msg_ready(self):
43 44 """Is there a message that has been received?"""
44 45 if self._in_queue.qsize() == 0:
45 46 return False
46 47 else:
47 48 return True
48 49
49 50 def get_msg(self, block=True, timeout=None):
50 51 """Get a message if there is one that is ready."""
51 52 if block and timeout is None:
52 53 # never use timeout=None, because get
53 54 # becomes uninterruptible
54 55 timeout = 1e6
55 56 return self._in_queue.get(block, timeout)
56 57
57 58 def get_msgs(self):
58 59 """Get all messages that are currently ready."""
59 60 msgs = []
60 61 while True:
61 62 try:
62 63 msgs.append(self.get_msg(block=False))
63 64 except Empty:
64 65 break
65 66 return msgs
66 67
67 68
68 69 class BlockingShellSocketChannel(ShellSocketChannel):
69 70
70 71 def __init__(self, context, session, address=None):
71 72 super(BlockingShellSocketChannel, self).__init__(context, session,
72 73 address)
73 74 self._in_queue = Queue()
74 75
75 76 def call_handlers(self, msg):
76 77 #io.rprint('[[Shell]]', msg) # dbg
77 78 self._in_queue.put(msg)
78 79
79 80 def msg_ready(self):
80 81 """Is there a message that has been received?"""
81 82 if self._in_queue.qsize() == 0:
82 83 return False
83 84 else:
84 85 return True
85 86
86 87 def get_msg(self, block=True, timeout=None):
87 88 """Get a message if there is one that is ready."""
88 89 if block and timeout is None:
89 90 # never use timeout=None, because get
90 91 # becomes uninterruptible
91 92 timeout = 1e6
92 93 return self._in_queue.get(block, timeout)
93 94
94 95 def get_msgs(self):
95 96 """Get all messages that are currently ready."""
96 97 msgs = []
97 98 while True:
98 99 try:
99 100 msgs.append(self.get_msg(block=False))
100 101 except Empty:
101 102 break
102 103 return msgs
103 104
104 105
105 106 class BlockingStdInSocketChannel(StdInSocketChannel):
106 107
108 def __init__(self, context, session, address=None):
109 super(BlockingRepSocketChannel, self).__init__(context, session, address)
110 self._in_queue = Queue()
111
107 112 def call_handlers(self, msg):
108 113 #io.rprint('[[Rep]]', msg) # dbg
109 pass
114 self._in_queue.put(msg)
115
116 def get_msg(self, block=True, timeout=None):
117 "Gets a message if there is one that is ready."
118 return self._in_queue.get(block, timeout)
119
120 def get_msgs(self):
121 """Get all messages that are currently ready."""
122 msgs = []
123 while True:
124 try:
125 msgs.append(self.get_msg(block=False))
126 except Empty:
127 break
128 return msgs
129
130 def msg_ready(self):
131 "Is there a message that has been received?"
132 return not self._in_queue.empty()
110 133
111 134
112 135 class BlockingHBSocketChannel(HBSocketChannel):
113 136
114 137 # This kernel needs rapid monitoring capabilities
115 138 time_to_dead = 0.2
116 139
117 140 def call_handlers(self, since_last_heartbeat):
118 141 #io.rprint('[[Heart]]', since_last_heartbeat) # dbg
119 142 pass
120 143
121 144
122 145 class BlockingKernelManager(KernelManager):
123 146
124 147 # The classes to use for the various channels.
125 148 shell_channel_class = Type(BlockingShellSocketChannel)
126 149 sub_channel_class = Type(BlockingSubSocketChannel)
127 150 stdin_channel_class = Type(BlockingStdInSocketChannel)
128 151 hb_channel_class = Type(BlockingHBSocketChannel)
129 152
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now