##// END OF EJS Templates
Initial version of system command out/err forwarding.
Brian Granger -
Show More
@@ -0,0 +1,20 b''
1 import sys
2 from subprocess import Popen, PIPE
3 from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC
4
5
6 class ZMQInteractiveShell(InteractiveShell):
7 """A subclass of InteractiveShell for ZMQ."""
8
9 def system(self, cmd):
10 cmd = self.var_expand(cmd, depth=2)
11 p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
12 for line in p.stdout.read().split('\n'):
13 if len(line) > 0:
14 print line
15 for line in p.stderr.read().split('\n'):
16 if len(line) > 0:
17 print line
18 return p.wait()
19
20 InteractiveShellABC.register(ZMQInteractiveShell)
@@ -1,364 +1,364 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 # Standard library imports.
18 18 import __builtin__
19 19 from code import CommandCompiler
20 20 import os
21 21 import sys
22 22 import time
23 23 import traceback
24 24
25 25 # System library imports.
26 26 import zmq
27 27
28 28 # Local imports.
29 29 from IPython.config.configurable import Configurable
30 from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC
30 from IPython.zmq.zmqshell import ZMQInteractiveShell
31 31 from IPython.external.argparse import ArgumentParser
32 32 from IPython.utils.traitlets import Instance
33 33 from IPython.zmq.session import Session, Message
34 34 from completer import KernelCompleter
35 35 from iostream import OutStream
36 36 from displayhook import DisplayHook
37 37 from exitpoller import ExitPollerUnix, ExitPollerWindows
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # Main kernel class
41 41 #-----------------------------------------------------------------------------
42 42
43 43 class Kernel(Configurable):
44 44
45 45 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
46 46 session = Instance('IPython.zmq.session.Session')
47 47 reply_socket = Instance('zmq.Socket')
48 48 pub_socket = Instance('zmq.Socket')
49 49 req_socket = Instance('zmq.Socket')
50 50
51 51 def __init__(self, **kwargs):
52 52 super(Kernel, self).__init__(**kwargs)
53 self.shell = InteractiveShell.instance()
53 self.shell = ZMQInteractiveShell.instance()
54 54
55 55 # Build dict of handlers for message types
56 56 msg_types = [ 'execute_request', 'complete_request',
57 57 'object_info_request' ]
58 58 self.handlers = {}
59 59 for msg_type in msg_types:
60 60 self.handlers[msg_type] = getattr(self, msg_type)
61 61
62 62 def abort_queue(self):
63 63 while True:
64 64 try:
65 65 ident = self.reply_socket.recv(zmq.NOBLOCK)
66 66 except zmq.ZMQError, e:
67 67 if e.errno == zmq.EAGAIN:
68 68 break
69 69 else:
70 70 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
71 71 msg = self.reply_socket.recv_json()
72 72 print>>sys.__stdout__, "Aborting:"
73 73 print>>sys.__stdout__, Message(msg)
74 74 msg_type = msg['msg_type']
75 75 reply_type = msg_type.split('_')[0] + '_reply'
76 76 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
77 77 print>>sys.__stdout__, Message(reply_msg)
78 78 self.reply_socket.send(ident,zmq.SNDMORE)
79 79 self.reply_socket.send_json(reply_msg)
80 80 # We need to wait a bit for requests to come in. This can probably
81 81 # be set shorter for true asynchronous clients.
82 82 time.sleep(0.1)
83 83
84 84 def execute_request(self, ident, parent):
85 85 try:
86 86 code = parent[u'content'][u'code']
87 87 except:
88 88 print>>sys.__stderr__, "Got bad msg: "
89 89 print>>sys.__stderr__, Message(parent)
90 90 return
91 91 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
92 92 self.pub_socket.send_json(pyin_msg)
93 93
94 94 try:
95 95 # Replace raw_input. Note that is not sufficient to replace
96 96 # raw_input in the user namespace.
97 97 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
98 98 __builtin__.raw_input = raw_input
99 99
100 100 # Configure the display hook.
101 101 sys.displayhook.set_parent(parent)
102 102
103 103 self.shell.runlines(code)
104 104 # exec comp_code in self.user_ns, self.user_ns
105 105 except:
106 106 etype, evalue, tb = sys.exc_info()
107 107 tb = traceback.format_exception(etype, evalue, tb)
108 108 exc_content = {
109 109 u'status' : u'error',
110 110 u'traceback' : tb,
111 111 u'ename' : unicode(etype.__name__),
112 112 u'evalue' : unicode(evalue)
113 113 }
114 114 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
115 115 self.pub_socket.send_json(exc_msg)
116 116 reply_content = exc_content
117 117 else:
118 118 reply_content = {'status' : 'ok'}
119 119
120 120 # Flush output before sending the reply.
121 121 sys.stderr.flush()
122 122 sys.stdout.flush()
123 123
124 124 # Send the reply.
125 125 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
126 126 print>>sys.__stdout__, Message(reply_msg)
127 127 self.reply_socket.send(ident, zmq.SNDMORE)
128 128 self.reply_socket.send_json(reply_msg)
129 129 if reply_msg['content']['status'] == u'error':
130 130 self.abort_queue()
131 131
132 132 def raw_input(self, prompt, ident, parent):
133 133 # Flush output before making the request.
134 134 sys.stderr.flush()
135 135 sys.stdout.flush()
136 136
137 137 # Send the input request.
138 138 content = dict(prompt=prompt)
139 139 msg = self.session.msg(u'input_request', content, parent)
140 140 self.req_socket.send_json(msg)
141 141
142 142 # Await a response.
143 143 reply = self.req_socket.recv_json()
144 144 try:
145 145 value = reply['content']['value']
146 146 except:
147 147 print>>sys.__stderr__, "Got bad raw_input reply: "
148 148 print>>sys.__stderr__, Message(parent)
149 149 value = ''
150 150 return value
151 151
152 152 def complete_request(self, ident, parent):
153 153 matches = {'matches' : self.complete(parent),
154 154 'status' : 'ok'}
155 155 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
156 156 matches, parent, ident)
157 157 print >> sys.__stdout__, completion_msg
158 158
159 159 def complete(self, msg):
160 160 return self.shell.complete(msg.content.line)
161 161
162 162 def object_info_request(self, ident, parent):
163 163 context = parent['content']['oname'].split('.')
164 164 object_info = self.object_info(context)
165 165 msg = self.session.send(self.reply_socket, 'object_info_reply',
166 166 object_info, parent, ident)
167 167 print >> sys.__stdout__, msg
168 168
169 169 def object_info(self, context):
170 170 symbol, leftover = self.symbol_from_context(context)
171 171 if symbol is not None and not leftover:
172 172 doc = getattr(symbol, '__doc__', '')
173 173 else:
174 174 doc = ''
175 175 object_info = dict(docstring = doc)
176 176 return object_info
177 177
178 178 def symbol_from_context(self, context):
179 179 if not context:
180 180 return None, context
181 181
182 182 base_symbol_string = context[0]
183 183 symbol = self.shell.user_ns.get(base_symbol_string, None)
184 184 if symbol is None:
185 185 symbol = __builtin__.__dict__.get(base_symbol_string, None)
186 186 if symbol is None:
187 187 return None, context
188 188
189 189 context = context[1:]
190 190 for i, name in enumerate(context):
191 191 new_symbol = getattr(symbol, name, None)
192 192 if new_symbol is None:
193 193 return symbol, context[i:]
194 194 else:
195 195 symbol = new_symbol
196 196
197 197 return symbol, []
198 198
199 199 def start(self):
200 200 while True:
201 201 ident = self.reply_socket.recv()
202 202 assert self.reply_socket.rcvmore(), "Missing message part."
203 203 msg = self.reply_socket.recv_json()
204 204 omsg = Message(msg)
205 205 print>>sys.__stdout__
206 206 print>>sys.__stdout__, omsg
207 207 handler = self.handlers.get(omsg.msg_type, None)
208 208 if handler is None:
209 209 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
210 210 else:
211 211 handler(ident, omsg)
212 212
213 213 #-----------------------------------------------------------------------------
214 214 # Kernel main and launch functions
215 215 #-----------------------------------------------------------------------------
216 216
217 217 def bind_port(socket, ip, port):
218 218 """ Binds the specified ZMQ socket. If the port is less than zero, a random
219 219 port is chosen. Returns the port that was bound.
220 220 """
221 221 connection = 'tcp://%s' % ip
222 222 if port <= 0:
223 223 port = socket.bind_to_random_port(connection)
224 224 else:
225 225 connection += ':%i' % port
226 226 socket.bind(connection)
227 227 return port
228 228
229 229
230 230 def main():
231 231 """ Main entry point for launching a kernel.
232 232 """
233 233 # Parse command line arguments.
234 234 parser = ArgumentParser()
235 235 parser.add_argument('--ip', type=str, default='127.0.0.1',
236 236 help='set the kernel\'s IP address [default: local]')
237 237 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
238 238 help='set the XREP channel port [default: random]')
239 239 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
240 240 help='set the PUB channel port [default: random]')
241 241 parser.add_argument('--req', type=int, metavar='PORT', default=0,
242 242 help='set the REQ channel port [default: random]')
243 243 if sys.platform == 'win32':
244 244 parser.add_argument('--parent', type=int, metavar='HANDLE',
245 245 default=0, help='kill this process if the process '
246 246 'with HANDLE dies')
247 247 else:
248 248 parser.add_argument('--parent', action='store_true',
249 249 help='kill this process if its parent dies')
250 250 namespace = parser.parse_args()
251 251
252 252 # Create a context, a session, and the kernel sockets.
253 253 print >>sys.__stdout__, "Starting the kernel..."
254 254 context = zmq.Context()
255 255 session = Session(username=u'kernel')
256 256
257 257 reply_socket = context.socket(zmq.XREP)
258 258 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
259 259 print >>sys.__stdout__, "XREP Channel on port", xrep_port
260 260
261 261 pub_socket = context.socket(zmq.PUB)
262 262 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
263 263 print >>sys.__stdout__, "PUB Channel on port", pub_port
264 264
265 265 req_socket = context.socket(zmq.XREQ)
266 266 req_port = bind_port(req_socket, namespace.ip, namespace.req)
267 267 print >>sys.__stdout__, "REQ Channel on port", req_port
268 268
269 269 # Redirect input streams and set a display hook.
270 270 sys.stdout = OutStream(session, pub_socket, u'stdout')
271 271 sys.stderr = OutStream(session, pub_socket, u'stderr')
272 272 sys.displayhook = DisplayHook(session, pub_socket)
273 273
274 274 # Create the kernel.
275 275 kernel = Kernel(
276 276 session=session, reply_socket=reply_socket,
277 277 pub_socket=pub_socket, req_socket=req_socket
278 278 )
279 279
280 280 # Configure this kernel/process to die on parent termination, if necessary.
281 281 if namespace.parent:
282 282 if sys.platform == 'win32':
283 283 poller = ExitPollerWindows(namespace.parent)
284 284 else:
285 285 poller = ExitPollerUnix()
286 286 poller.start()
287 287
288 288 # Start the kernel mainloop.
289 289 kernel.start()
290 290
291 291
292 292 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
293 293 """ Launches a localhost kernel, binding to the specified ports.
294 294
295 295 Parameters
296 296 ----------
297 297 xrep_port : int, optional
298 298 The port to use for XREP channel.
299 299
300 300 pub_port : int, optional
301 301 The port to use for the SUB channel.
302 302
303 303 req_port : int, optional
304 304 The port to use for the REQ (raw input) channel.
305 305
306 306 independent : bool, optional (default False)
307 307 If set, the kernel process is guaranteed to survive if this process
308 308 dies. If not set, an effort is made to ensure that the kernel is killed
309 309 when this process dies. Note that in this case it is still good practice
310 310 to kill kernels manually before exiting.
311 311
312 312 Returns
313 313 -------
314 314 A tuple of form:
315 315 (kernel_process, xrep_port, pub_port, req_port)
316 316 where kernel_process is a Popen object and the ports are integers.
317 317 """
318 318 import socket
319 319 from subprocess import Popen
320 320
321 321 # Find open ports as necessary.
322 322 ports = []
323 323 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
324 324 for i in xrange(ports_needed):
325 325 sock = socket.socket()
326 326 sock.bind(('', 0))
327 327 ports.append(sock)
328 328 for i, sock in enumerate(ports):
329 329 port = sock.getsockname()[1]
330 330 sock.close()
331 331 ports[i] = port
332 332 if xrep_port <= 0:
333 333 xrep_port = ports.pop(0)
334 334 if pub_port <= 0:
335 335 pub_port = ports.pop(0)
336 336 if req_port <= 0:
337 337 req_port = ports.pop(0)
338 338
339 339 # Spawn a kernel.
340 340 command = 'from IPython.zmq.ipkernel import main; main()'
341 341 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
342 342 '--pub', str(pub_port), '--req', str(req_port) ]
343 343 if independent:
344 344 if sys.platform == 'win32':
345 345 proc = Popen(['start', '/b'] + arguments, shell=True)
346 346 else:
347 347 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
348 348 else:
349 349 if sys.platform == 'win32':
350 350 from _subprocess import DuplicateHandle, GetCurrentProcess, \
351 351 DUPLICATE_SAME_ACCESS
352 352 pid = GetCurrentProcess()
353 353 handle = DuplicateHandle(pid, pid, pid, 0,
354 354 True, # Inheritable by new processes.
355 355 DUPLICATE_SAME_ACCESS)
356 356 proc = Popen(arguments + ['--parent', str(int(handle))])
357 357 else:
358 358 proc = Popen(arguments + ['--parent'])
359 359
360 360 return proc, xrep_port, pub_port, req_port
361 361
362 362
363 363 if __name__ == '__main__':
364 364 main()
General Comments 0
You need to be logged in to leave comments. Login now