##// END OF EJS Templates
add topic to display publisher, and fix set_parent for apply_requests
MinRK -
Show More
@@ -1,236 +1,237 b''
1 1 """A simple engine that talks to a controller over 0MQ.
2 2 it handles registration, etc. and launches a kernel
3 3 connected to the Controller's Schedulers.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 import time
20 20 from getpass import getpass
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.external.ssh import tunnel
26 26 # internal
27 27 from IPython.utils.traitlets import (
28 28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
29 29 )
30 30 from IPython.utils.py3compat import cast_bytes
31 31
32 32 from IPython.parallel.controller.heartmonitor import Heart
33 33 from IPython.parallel.factory import RegistrationFactory
34 34 from IPython.parallel.util import disambiguate_url
35 35
36 36 from IPython.zmq.session import Message
37 37 from IPython.zmq.ipkernel import Kernel
38 38
39 39 class EngineFactory(RegistrationFactory):
40 40 """IPython engine"""
41 41
42 42 # configurables:
43 43 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
44 44 help="""The OutStream for handling stdout/err.
45 45 Typically 'IPython.zmq.iostream.OutStream'""")
46 46 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
47 47 help="""The class for handling displayhook.
48 48 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
49 49 location=Unicode(config=True,
50 50 help="""The location (an IP address) of the controller. This is
51 51 used for disambiguating URLs, to determine whether
52 52 loopback should be used to connect or the public address.""")
53 53 timeout=CFloat(2,config=True,
54 54 help="""The time (in seconds) to wait for the Controller to respond
55 55 to registration requests before giving up.""")
56 56 sshserver=Unicode(config=True,
57 57 help="""The SSH server to use for tunneling connections to the Controller.""")
58 58 sshkey=Unicode(config=True,
59 59 help="""The SSH private key file to use when tunneling connections to the Controller.""")
60 60 paramiko=Bool(sys.platform == 'win32', config=True,
61 61 help="""Whether to use paramiko instead of openssh for tunnels.""")
62 62
63 63 # not configurable:
64 64 user_ns=Dict()
65 65 id=Integer(allow_none=True)
66 66 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 67 kernel=Instance(Kernel)
68 68
69 69 bident = CBytes()
70 70 ident = Unicode()
71 71 def _ident_changed(self, name, old, new):
72 72 self.bident = cast_bytes(new)
73 73 using_ssh=Bool(False)
74 74
75 75
76 76 def __init__(self, **kwargs):
77 77 super(EngineFactory, self).__init__(**kwargs)
78 78 self.ident = self.session.session
79 79
80 80 def init_connector(self):
81 81 """construct connection function, which handles tunnels."""
82 82 self.using_ssh = bool(self.sshkey or self.sshserver)
83 83
84 84 if self.sshkey and not self.sshserver:
85 85 # We are using ssh directly to the controller, tunneling localhost to localhost
86 86 self.sshserver = self.url.split('://')[1].split(':')[0]
87 87
88 88 if self.using_ssh:
89 89 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
90 90 password=False
91 91 else:
92 92 password = getpass("SSH Password for %s: "%self.sshserver)
93 93 else:
94 94 password = False
95 95
96 96 def connect(s, url):
97 97 url = disambiguate_url(url, self.location)
98 98 if self.using_ssh:
99 99 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
100 100 return tunnel.tunnel_connection(s, url, self.sshserver,
101 101 keyfile=self.sshkey, paramiko=self.paramiko,
102 102 password=password,
103 103 )
104 104 else:
105 105 return s.connect(url)
106 106
107 107 def maybe_tunnel(url):
108 108 """like connect, but don't complete the connection (for use by heartbeat)"""
109 109 url = disambiguate_url(url, self.location)
110 110 if self.using_ssh:
111 111 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
112 112 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
113 113 keyfile=self.sshkey, paramiko=self.paramiko,
114 114 password=password,
115 115 )
116 116 return url
117 117 return connect, maybe_tunnel
118 118
119 119 def register(self):
120 120 """send the registration_request"""
121 121
122 122 self.log.info("Registering with controller at %s"%self.url)
123 123 ctx = self.context
124 124 connect,maybe_tunnel = self.init_connector()
125 125 reg = ctx.socket(zmq.DEALER)
126 126 reg.setsockopt(zmq.IDENTITY, self.bident)
127 127 connect(reg, self.url)
128 128 self.registrar = zmqstream.ZMQStream(reg, self.loop)
129 129
130 130
131 131 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 132 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 133 # print (self.session.key)
134 134 self.session.send(self.registrar, "registration_request",content=content)
135 135
136 136 def complete_registration(self, msg, connect, maybe_tunnel):
137 137 # print msg
138 138 self._abort_dc.stop()
139 139 ctx = self.context
140 140 loop = self.loop
141 141 identity = self.bident
142 142 idents,msg = self.session.feed_identities(msg)
143 143 msg = Message(self.session.unserialize(msg))
144 144
145 145 if msg.content.status == 'ok':
146 146 self.id = int(msg.content.id)
147 147
148 148 # launch heartbeat
149 149 hb_addrs = msg.content.heartbeat
150 150
151 151 # possibly forward hb ports with tunnels
152 152 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
153 153 heart = Heart(*map(str, hb_addrs), heart_id=identity)
154 154 heart.start()
155 155
156 156 # create Shell Streams (MUX, Task, etc.):
157 157 queue_addr = msg.content.mux
158 158 shell_addrs = [ str(queue_addr) ]
159 159 task_addr = msg.content.task
160 160 if task_addr:
161 161 shell_addrs.append(str(task_addr))
162 162
163 163 # Uncomment this to go back to two-socket model
164 164 # shell_streams = []
165 165 # for addr in shell_addrs:
166 166 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
167 167 # stream.setsockopt(zmq.IDENTITY, identity)
168 168 # stream.connect(disambiguate_url(addr, self.location))
169 169 # shell_streams.append(stream)
170 170
171 171 # Now use only one shell stream for mux and tasks
172 172 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
173 173 stream.setsockopt(zmq.IDENTITY, identity)
174 174 shell_streams = [stream]
175 175 for addr in shell_addrs:
176 176 connect(stream, addr)
177 177 # end single stream-socket
178 178
179 179 # control stream:
180 180 control_addr = str(msg.content.control)
181 181 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
182 182 control_stream.setsockopt(zmq.IDENTITY, identity)
183 183 connect(control_stream, control_addr)
184 184
185 185 # create iopub stream:
186 186 iopub_addr = msg.content.iopub
187 187 iopub_socket = ctx.socket(zmq.PUB)
188 188 iopub_socket.setsockopt(zmq.IDENTITY, identity)
189 189 connect(iopub_socket, iopub_addr)
190 190
191 191 # disable history:
192 192 self.config.HistoryManager.hist_file = ':memory:'
193 193
194 194 # Redirect input streams and set a display hook.
195 195 if self.out_stream_factory:
196 196 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
197 197 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
198 198 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
199 199 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
200 200 if self.display_hook_factory:
201 201 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
202 202 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
203 203
204 204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
205 205 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
206 206 loop=loop, user_ns=self.user_ns, log=self.log)
207 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
207 208 self.kernel.start()
208 209
209 210
210 211 else:
211 212 self.log.fatal("Registration Failed: %s"%msg)
212 213 raise Exception("Registration Failed: %s"%msg)
213 214
214 215 self.log.info("Completed registration with id %i"%self.id)
215 216
216 217
217 218 def abort(self):
218 219 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
219 220 if self.url.startswith('127.'):
220 221 self.log.fatal("""
221 222 If the controller and engines are not on the same machine,
222 223 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
223 224 c.HubFactory.ip='*' # for all interfaces, internal and external
224 225 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
225 226 or tunnel connections via ssh.
226 227 """)
227 228 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
228 229 time.sleep(1)
229 230 sys.exit(255)
230 231
231 232 def start(self):
232 233 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
233 234 dc.start()
234 235 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
235 236 self._abort_dc.start()
236 237
@@ -1,917 +1,919 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.core import pylabtools
39 39 from IPython.config.configurable import Configurable
40 40 from IPython.config.application import boolean_flag, catch_config_error
41 41 from IPython.core.application import ProfileDir
42 42 from IPython.core.error import StdinNotImplementedError
43 43 from IPython.core.shellapp import (
44 44 InteractiveShellApp, shell_flags, shell_aliases
45 45 )
46 46 from IPython.utils import io
47 47 from IPython.utils import py3compat
48 48 from IPython.utils.frame import extract_module_locals
49 49 from IPython.utils.jsonutil import json_clean
50 50 from IPython.utils.traitlets import (
51 51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
52 52 )
53 53
54 54 from entry_point import base_launch_kernel
55 55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 56 from serialize import serialize_object, unpack_apply_message
57 57 from session import Session, Message
58 58 from zmqshell import ZMQInteractiveShell
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # Main kernel class
63 63 #-----------------------------------------------------------------------------
64 64
65 65 class Kernel(Configurable):
66 66
67 67 #---------------------------------------------------------------------------
68 68 # Kernel interface
69 69 #---------------------------------------------------------------------------
70 70
71 71 # attribute to override with a GUI
72 72 eventloop = Any(None)
73 73 def _eventloop_changed(self, name, old, new):
74 74 """schedule call to eventloop from IOLoop"""
75 75 loop = ioloop.IOLoop.instance()
76 76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
77 77
78 78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
79 79 session = Instance(Session)
80 80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
81 81 shell_streams = List()
82 82 control_stream = Instance(ZMQStream)
83 83 iopub_socket = Instance(zmq.Socket)
84 84 stdin_socket = Instance(zmq.Socket)
85 85 log = Instance(logging.Logger)
86 86
87 87 user_module = Any()
88 88 def _user_module_changed(self, name, old, new):
89 89 if self.shell is not None:
90 90 self.shell.user_module = new
91 91
92 92 user_ns = Dict(default_value=None)
93 93 def _user_ns_changed(self, name, old, new):
94 94 if self.shell is not None:
95 95 self.shell.user_ns = new
96 96 self.shell.init_user_ns()
97 97
98 98 # identities:
99 99 int_id = Integer(-1)
100 100 ident = Unicode()
101 101
102 102 def _ident_default(self):
103 103 return unicode(uuid.uuid4())
104 104
105 105
106 106 # Private interface
107 107
108 108 # Time to sleep after flushing the stdout/err buffers in each execute
109 109 # cycle. While this introduces a hard limit on the minimal latency of the
110 110 # execute cycle, it helps prevent output synchronization problems for
111 111 # clients.
112 112 # Units are in seconds. The minimum zmq latency on local host is probably
113 113 # ~150 microseconds, set this to 500us for now. We may need to increase it
114 114 # a little if it's not enough after more interactive testing.
115 115 _execute_sleep = Float(0.0005, config=True)
116 116
117 117 # Frequency of the kernel's event loop.
118 118 # Units are in seconds, kernel subclasses for GUI toolkits may need to
119 119 # adapt to milliseconds.
120 120 _poll_interval = Float(0.05, config=True)
121 121
122 122 # If the shutdown was requested over the network, we leave here the
123 123 # necessary reply message so it can be sent by our registered atexit
124 124 # handler. This ensures that the reply is only sent to clients truly at
125 125 # the end of our shutdown process (which happens after the underlying
126 126 # IPython shell's own shutdown).
127 127 _shutdown_message = None
128 128
129 129 # This is a dict of port number that the kernel is listening on. It is set
130 130 # by record_ports and used by connect_request.
131 131 _recorded_ports = Dict()
132 132
133 133 # set of aborted msg_ids
134 134 aborted = Set()
135 135
136 136
137 137 def __init__(self, **kwargs):
138 138 super(Kernel, self).__init__(**kwargs)
139 139
140 140 # Initialize the InteractiveShell subclass
141 141 self.shell = ZMQInteractiveShell.instance(config=self.config,
142 142 profile_dir = self.profile_dir,
143 143 user_module = self.user_module,
144 144 user_ns = self.user_ns,
145 145 )
146 146 self.shell.displayhook.session = self.session
147 147 self.shell.displayhook.pub_socket = self.iopub_socket
148 148 self.shell.displayhook.topic = self._topic('pyout')
149 149 self.shell.display_pub.session = self.session
150 150 self.shell.display_pub.pub_socket = self.iopub_socket
151 151
152 152 # TMP - hack while developing
153 153 self.shell._reply_content = None
154 154
155 155 # Build dict of handlers for message types
156 156 msg_types = [ 'execute_request', 'complete_request',
157 157 'object_info_request', 'history_request',
158 158 'connect_request', 'shutdown_request',
159 159 'apply_request',
160 160 ]
161 161 self.shell_handlers = {}
162 162 for msg_type in msg_types:
163 163 self.shell_handlers[msg_type] = getattr(self, msg_type)
164 164
165 165 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
166 166 self.control_handlers = {}
167 167 for msg_type in control_msg_types:
168 168 self.control_handlers[msg_type] = getattr(self, msg_type)
169 169
170 170 def dispatch_control(self, msg):
171 171 """dispatch control requests"""
172 172 idents,msg = self.session.feed_identities(msg, copy=False)
173 173 try:
174 174 msg = self.session.unserialize(msg, content=True, copy=False)
175 175 except:
176 176 self.log.error("Invalid Control Message", exc_info=True)
177 177 return
178 178
179 179 self.log.debug("Control received: %s", msg)
180 180
181 181 header = msg['header']
182 182 msg_id = header['msg_id']
183 183 msg_type = header['msg_type']
184 184
185 185 handler = self.control_handlers.get(msg_type, None)
186 186 if handler is None:
187 187 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
188 188 else:
189 189 try:
190 190 handler(self.control_stream, idents, msg)
191 191 except Exception:
192 192 self.log.error("Exception in control handler:", exc_info=True)
193 193
194 194 def dispatch_shell(self, stream, msg):
195 195 """dispatch shell requests"""
196 196 # flush control requests first
197 197 if self.control_stream:
198 198 self.control_stream.flush()
199 199
200 200 idents,msg = self.session.feed_identities(msg, copy=False)
201 201 try:
202 202 msg = self.session.unserialize(msg, content=True, copy=False)
203 203 except:
204 204 self.log.error("Invalid Message", exc_info=True)
205 205 return
206 206
207 207 header = msg['header']
208 208 msg_id = header['msg_id']
209 209 msg_type = msg['header']['msg_type']
210 210
211 211 # Print some info about this message and leave a '--->' marker, so it's
212 212 # easier to trace visually the message chain when debugging. Each
213 213 # handler prints its message at the end.
214 214 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
215 215 self.log.debug(' Content: %s\n --->\n ', msg['content'])
216 216
217 217 if msg_id in self.aborted:
218 218 self.aborted.remove(msg_id)
219 219 # is it safe to assume a msg_id will not be resubmitted?
220 220 reply_type = msg_type.split('_')[0] + '_reply'
221 221 status = {'status' : 'aborted'}
222 222 sub = {'engine' : self.ident}
223 223 sub.update(status)
224 224 reply_msg = self.session.send(stream, reply_type, subheader=sub,
225 225 content=status, parent=msg, ident=idents)
226 226 return
227 227
228 228 handler = self.shell_handlers.get(msg_type, None)
229 229 if handler is None:
230 230 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
231 231 else:
232 232 # ensure default_int_handler during handler call
233 233 sig = signal(SIGINT, default_int_handler)
234 234 try:
235 235 handler(stream, idents, msg)
236 236 except Exception:
237 237 self.log.error("Exception in message handler:", exc_info=True)
238 238 finally:
239 239 signal(SIGINT, sig)
240 240
241 241 def enter_eventloop(self):
242 242 """enter eventloop"""
243 243 self.log.critical("entering eventloop")
244 244 # restore default_int_handler
245 245 signal(SIGINT, default_int_handler)
246 246 while self.eventloop is not None:
247 247 try:
248 248 self.eventloop(self)
249 249 except KeyboardInterrupt:
250 250 # Ctrl-C shouldn't crash the kernel
251 251 self.log.error("KeyboardInterrupt caught in kernel")
252 252 continue
253 253 else:
254 254 # eventloop exited cleanly, this means we should stop (right?)
255 255 self.eventloop = None
256 256 break
257 257 self.log.critical("exiting eventloop")
258 258 # if eventloop exits, IOLoop should stop
259 259 ioloop.IOLoop.instance().stop()
260 260
261 261 def start(self):
262 262 """register dispatchers for streams"""
263 263 self.shell.exit_now = False
264 264 if self.control_stream:
265 265 self.control_stream.on_recv(self.dispatch_control, copy=False)
266 266
267 267 def make_dispatcher(stream):
268 268 def dispatcher(msg):
269 269 return self.dispatch_shell(stream, msg)
270 270 return dispatcher
271 271
272 272 for s in self.shell_streams:
273 273 s.on_recv(make_dispatcher(s), copy=False)
274 274
275 275 def do_one_iteration(self):
276 276 """step eventloop just once"""
277 277 if self.control_stream:
278 278 self.control_stream.flush()
279 279 for stream in self.shell_streams:
280 280 # handle at most one request per iteration
281 281 stream.flush(zmq.POLLIN, 1)
282 282 stream.flush(zmq.POLLOUT)
283 283
284 284
285 285 def record_ports(self, ports):
286 286 """Record the ports that this kernel is using.
287 287
288 288 The creator of the Kernel instance must call this methods if they
289 289 want the :meth:`connect_request` method to return the port numbers.
290 290 """
291 291 self._recorded_ports = ports
292 292
293 293 #---------------------------------------------------------------------------
294 294 # Kernel request handlers
295 295 #---------------------------------------------------------------------------
296 296
297 297 def _make_subheader(self):
298 298 """init subheader dict, for execute/apply_reply"""
299 299 return {
300 300 'dependencies_met' : True,
301 301 'engine' : self.ident,
302 302 'started': datetime.now(),
303 303 }
304 304
305 305 def _publish_pyin(self, code, parent, execution_count):
306 306 """Publish the code request on the pyin stream."""
307 307
308 308 self.session.send(self.iopub_socket, u'pyin',
309 309 {u'code':code, u'execution_count': execution_count},
310 310 parent=parent, ident=self._topic('pyin')
311 311 )
312 312
313 313 def execute_request(self, stream, ident, parent):
314 314
315 315 self.session.send(self.iopub_socket,
316 316 u'status',
317 317 {u'execution_state':u'busy'},
318 318 parent=parent,
319 319 ident=self._topic('status'),
320 320 )
321 321
322 322 try:
323 323 content = parent[u'content']
324 324 code = content[u'code']
325 325 silent = content[u'silent']
326 326 except:
327 327 self.log.error("Got bad msg: ")
328 328 self.log.error("%s", parent)
329 329 return
330 330
331 331 sub = self._make_subheader()
332 332
333 333 shell = self.shell # we'll need this a lot here
334 334
335 335 # Replace raw_input. Note that is not sufficient to replace
336 336 # raw_input in the user namespace.
337 337 if content.get('allow_stdin', False):
338 338 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
339 339 else:
340 340 raw_input = lambda prompt='' : self._no_raw_input()
341 341
342 342 if py3compat.PY3:
343 343 __builtin__.input = raw_input
344 344 else:
345 345 __builtin__.raw_input = raw_input
346 346
347 347 # Set the parent message of the display hook and out streams.
348 348 shell.displayhook.set_parent(parent)
349 349 shell.display_pub.set_parent(parent)
350 350 sys.stdout.set_parent(parent)
351 351 sys.stderr.set_parent(parent)
352 352
353 353 # Re-broadcast our input for the benefit of listening clients, and
354 354 # start computing output
355 355 if not silent:
356 356 self._publish_pyin(code, parent, shell.execution_count)
357 357
358 358 reply_content = {}
359 359 try:
360 360 # FIXME: the shell calls the exception handler itself.
361 361 shell.run_cell(code, store_history=not silent, silent=silent)
362 362 except:
363 363 status = u'error'
364 364 # FIXME: this code right now isn't being used yet by default,
365 365 # because the run_cell() call above directly fires off exception
366 366 # reporting. This code, therefore, is only active in the scenario
367 367 # where runlines itself has an unhandled exception. We need to
368 368 # uniformize this, for all exception construction to come from a
369 369 # single location in the codbase.
370 370 etype, evalue, tb = sys.exc_info()
371 371 tb_list = traceback.format_exception(etype, evalue, tb)
372 372 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
373 373 else:
374 374 status = u'ok'
375 375
376 376 reply_content[u'status'] = status
377 377
378 378 # Return the execution counter so clients can display prompts
379 379 reply_content['execution_count'] = shell.execution_count - 1
380 380
381 381 # FIXME - fish exception info out of shell, possibly left there by
382 382 # runlines. We'll need to clean up this logic later.
383 383 if shell._reply_content is not None:
384 384 reply_content.update(shell._reply_content)
385 385 # reset after use
386 386 shell._reply_content = None
387 387
388 388 # At this point, we can tell whether the main code execution succeeded
389 389 # or not. If it did, we proceed to evaluate user_variables/expressions
390 390 if reply_content['status'] == 'ok':
391 391 reply_content[u'user_variables'] = \
392 392 shell.user_variables(content.get(u'user_variables', []))
393 393 reply_content[u'user_expressions'] = \
394 394 shell.user_expressions(content.get(u'user_expressions', {}))
395 395 else:
396 396 # If there was an error, don't even try to compute variables or
397 397 # expressions
398 398 reply_content[u'user_variables'] = {}
399 399 reply_content[u'user_expressions'] = {}
400 400
401 401 # Payloads should be retrieved regardless of outcome, so we can both
402 402 # recover partial output (that could have been generated early in a
403 403 # block, before an error) and clear the payload system always.
404 404 reply_content[u'payload'] = shell.payload_manager.read_payload()
405 405 # Be agressive about clearing the payload because we don't want
406 406 # it to sit in memory until the next execute_request comes in.
407 407 shell.payload_manager.clear_payload()
408 408
409 409 # Flush output before sending the reply.
410 410 sys.stdout.flush()
411 411 sys.stderr.flush()
412 412 # FIXME: on rare occasions, the flush doesn't seem to make it to the
413 413 # clients... This seems to mitigate the problem, but we definitely need
414 414 # to better understand what's going on.
415 415 if self._execute_sleep:
416 416 time.sleep(self._execute_sleep)
417 417
418 418 # Send the reply.
419 419 reply_content = json_clean(reply_content)
420 420
421 421 sub['status'] = reply_content['status']
422 422 if reply_content['status'] == 'error' and \
423 423 reply_content['ename'] == 'UnmetDependency':
424 424 sub['dependencies_met'] = False
425 425
426 426 reply_msg = self.session.send(stream, u'execute_reply',
427 427 reply_content, parent, subheader=sub,
428 428 ident=ident)
429 429
430 430 self.log.debug("%s", reply_msg)
431 431
432 432 if not silent and reply_msg['content']['status'] == u'error':
433 433 self._abort_queues()
434 434
435 435 self.session.send(self.iopub_socket,
436 436 u'status',
437 437 {u'execution_state':u'idle'},
438 438 parent=parent,
439 439 ident=self._topic('status'))
440 440
441 441 def complete_request(self, stream, ident, parent):
442 442 txt, matches = self._complete(parent)
443 443 matches = {'matches' : matches,
444 444 'matched_text' : txt,
445 445 'status' : 'ok'}
446 446 matches = json_clean(matches)
447 447 completion_msg = self.session.send(stream, 'complete_reply',
448 448 matches, parent, ident)
449 449 self.log.debug("%s", completion_msg)
450 450
451 451 def object_info_request(self, stream, ident, parent):
452 452 content = parent['content']
453 453 object_info = self.shell.object_inspect(content['oname'],
454 454 detail_level = content.get('detail_level', 0)
455 455 )
456 456 # Before we send this object over, we scrub it for JSON usage
457 457 oinfo = json_clean(object_info)
458 458 msg = self.session.send(stream, 'object_info_reply',
459 459 oinfo, parent, ident)
460 460 self.log.debug("%s", msg)
461 461
462 462 def history_request(self, stream, ident, parent):
463 463 # We need to pull these out, as passing **kwargs doesn't work with
464 464 # unicode keys before Python 2.6.5.
465 465 hist_access_type = parent['content']['hist_access_type']
466 466 raw = parent['content']['raw']
467 467 output = parent['content']['output']
468 468 if hist_access_type == 'tail':
469 469 n = parent['content']['n']
470 470 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
471 471 include_latest=True)
472 472
473 473 elif hist_access_type == 'range':
474 474 session = parent['content']['session']
475 475 start = parent['content']['start']
476 476 stop = parent['content']['stop']
477 477 hist = self.shell.history_manager.get_range(session, start, stop,
478 478 raw=raw, output=output)
479 479
480 480 elif hist_access_type == 'search':
481 481 pattern = parent['content']['pattern']
482 482 hist = self.shell.history_manager.search(pattern, raw=raw,
483 483 output=output)
484 484
485 485 else:
486 486 hist = []
487 487 hist = list(hist)
488 488 content = {'history' : hist}
489 489 content = json_clean(content)
490 490 msg = self.session.send(stream, 'history_reply',
491 491 content, parent, ident)
492 492 self.log.debug("Sending history reply with %i entries", len(hist))
493 493
494 494 def connect_request(self, stream, ident, parent):
495 495 if self._recorded_ports is not None:
496 496 content = self._recorded_ports.copy()
497 497 else:
498 498 content = {}
499 499 msg = self.session.send(stream, 'connect_reply',
500 500 content, parent, ident)
501 501 self.log.debug("%s", msg)
502 502
503 503 def shutdown_request(self, stream, ident, parent):
504 504 self.shell.exit_now = True
505 505 content = dict(status='ok')
506 506 content.update(parent['content'])
507 507 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
508 508 # same content, but different msg_id for broadcasting on IOPub
509 509 self._shutdown_message = self.session.msg(u'shutdown_reply',
510 510 content, parent
511 511 )
512 512
513 513 self._at_shutdown()
514 514 # call sys.exit after a short delay
515 515 loop = ioloop.IOLoop.instance()
516 516 loop.add_timeout(time.time()+0.1, loop.stop)
517 517
518 518 #---------------------------------------------------------------------------
519 519 # Engine methods
520 520 #---------------------------------------------------------------------------
521 521
522 522 def apply_request(self, stream, ident, parent):
523 523 try:
524 524 content = parent[u'content']
525 525 bufs = parent[u'buffers']
526 526 msg_id = parent['header']['msg_id']
527 527 except:
528 528 self.log.error("Got bad msg: %s", parent, exc_info=True)
529 529 return
530
531 # Set the parent message of the display hook and out streams.
532 self.shell.displayhook.set_parent(parent)
533 self.shell.display_pub.set_parent(parent)
534 sys.stdout.set_parent(parent)
535 sys.stderr.set_parent(parent)
536
530 537 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
531 538 # self.iopub_socket.send(pyin_msg)
532 539 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
533 540 sub = self._make_subheader()
534 541 try:
535 # allow for not overriding displayhook
536 if hasattr(sys.displayhook, 'set_parent'):
537 sys.displayhook.set_parent(parent)
538 sys.stdout.set_parent(parent)
539 sys.stderr.set_parent(parent)
540 542 working = self.shell.user_ns
541 543
542 544 prefix = "_"+str(msg_id).replace("-","")+"_"
543 545
544 546 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
545 547
546 548 fname = getattr(f, '__name__', 'f')
547 549
548 550 fname = prefix+"f"
549 551 argname = prefix+"args"
550 552 kwargname = prefix+"kwargs"
551 553 resultname = prefix+"result"
552 554
553 555 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
554 556 # print ns
555 557 working.update(ns)
556 558 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
557 559 try:
558 560 exec code in self.shell.user_global_ns, self.shell.user_ns
559 561 result = working.get(resultname)
560 562 finally:
561 563 for key in ns.iterkeys():
562 564 working.pop(key)
563 565
564 566 packed_result,buf = serialize_object(result)
565 567 result_buf = [packed_result]+buf
566 568 except:
567 569 exc_content = self._wrap_exception('apply')
568 570 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
569 571 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
570 572 ident=self._topic('pyerr'))
571 573 reply_content = exc_content
572 574 result_buf = []
573 575
574 576 if exc_content['ename'] == 'UnmetDependency':
575 577 sub['dependencies_met'] = False
576 578 else:
577 579 reply_content = {'status' : 'ok'}
578 580
579 581 # put 'ok'/'error' status in header, for scheduler introspection:
580 582 sub['status'] = reply_content['status']
581 583
582 584 # flush i/o
583 585 sys.stdout.flush()
584 586 sys.stderr.flush()
585 587
586 588 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
587 589 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
588 590
589 591 #---------------------------------------------------------------------------
590 592 # Control messages
591 593 #---------------------------------------------------------------------------
592 594
593 595 def abort_request(self, stream, ident, parent):
594 596 """abort a specifig msg by id"""
595 597 msg_ids = parent['content'].get('msg_ids', None)
596 598 if isinstance(msg_ids, basestring):
597 599 msg_ids = [msg_ids]
598 600 if not msg_ids:
599 601 self.abort_queues()
600 602 for mid in msg_ids:
601 603 self.aborted.add(str(mid))
602 604
603 605 content = dict(status='ok')
604 606 reply_msg = self.session.send(stream, 'abort_reply', content=content,
605 607 parent=parent, ident=ident)
606 608 self.log.debug("%s", reply_msg)
607 609
608 610 def clear_request(self, stream, idents, parent):
609 611 """Clear our namespace."""
610 612 self.shell.reset(False)
611 613 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
612 614 content = dict(status='ok'))
613 615
614 616
615 617 #---------------------------------------------------------------------------
616 618 # Protected interface
617 619 #---------------------------------------------------------------------------
618 620
619 621
620 622 def _wrap_exception(self, method=None):
621 623 # import here, because _wrap_exception is only used in parallel,
622 624 # and parallel has higher min pyzmq version
623 625 from IPython.parallel.error import wrap_exception
624 626 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
625 627 content = wrap_exception(e_info)
626 628 return content
627 629
628 630 def _topic(self, topic):
629 631 """prefixed topic for IOPub messages"""
630 632 if self.int_id >= 0:
631 633 base = "engine.%i" % self.int_id
632 634 else:
633 635 base = "kernel.%s" % self.ident
634 636
635 637 return py3compat.cast_bytes("%s.%s" % (base, topic))
636 638
637 639 def _abort_queues(self):
638 640 for stream in self.shell_streams:
639 641 if stream:
640 642 self._abort_queue(stream)
641 643
642 644 def _abort_queue(self, stream):
643 645 poller = zmq.Poller()
644 646 poller.register(stream.socket, zmq.POLLIN)
645 647 while True:
646 648 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
647 649 if msg is None:
648 650 return
649 651
650 652 self.log.info("Aborting:")
651 653 self.log.info("%s", msg)
652 654 msg_type = msg['header']['msg_type']
653 655 reply_type = msg_type.split('_')[0] + '_reply'
654 656
655 657 status = {'status' : 'aborted'}
656 658 sub = {'engine' : self.ident}
657 659 sub.update(status)
658 660 reply_msg = self.session.send(stream, reply_type, subheader=sub,
659 661 content=status, parent=msg, ident=idents)
660 662 self.log.debug("%s", reply_msg)
661 663 # We need to wait a bit for requests to come in. This can probably
662 664 # be set shorter for true asynchronous clients.
663 665 poller.poll(50)
664 666
665 667
666 668 def _no_raw_input(self):
667 669 """Raise StdinNotImplentedError if active frontend doesn't support
668 670 stdin."""
669 671 raise StdinNotImplementedError("raw_input was called, but this "
670 672 "frontend does not support stdin.")
671 673
672 674 def _raw_input(self, prompt, ident, parent):
673 675 # Flush output before making the request.
674 676 sys.stderr.flush()
675 677 sys.stdout.flush()
676 678
677 679 # Send the input request.
678 680 content = json_clean(dict(prompt=prompt))
679 681 self.session.send(self.stdin_socket, u'input_request', content, parent,
680 682 ident=ident)
681 683
682 684 # Await a response.
683 685 while True:
684 686 try:
685 687 ident, reply = self.session.recv(self.stdin_socket, 0)
686 688 except Exception:
687 689 self.log.warn("Invalid Message:", exc_info=True)
688 690 else:
689 691 break
690 692 try:
691 693 value = reply['content']['value']
692 694 except:
693 695 self.log.error("Got bad raw_input reply: ")
694 696 self.log.error("%s", parent)
695 697 value = ''
696 698 if value == '\x04':
697 699 # EOF
698 700 raise EOFError
699 701 return value
700 702
701 703 def _complete(self, msg):
702 704 c = msg['content']
703 705 try:
704 706 cpos = int(c['cursor_pos'])
705 707 except:
706 708 # If we don't get something that we can convert to an integer, at
707 709 # least attempt the completion guessing the cursor is at the end of
708 710 # the text, if there's any, and otherwise of the line
709 711 cpos = len(c['text'])
710 712 if cpos==0:
711 713 cpos = len(c['line'])
712 714 return self.shell.complete(c['text'], c['line'], cpos)
713 715
714 716 def _object_info(self, context):
715 717 symbol, leftover = self._symbol_from_context(context)
716 718 if symbol is not None and not leftover:
717 719 doc = getattr(symbol, '__doc__', '')
718 720 else:
719 721 doc = ''
720 722 object_info = dict(docstring = doc)
721 723 return object_info
722 724
723 725 def _symbol_from_context(self, context):
724 726 if not context:
725 727 return None, context
726 728
727 729 base_symbol_string = context[0]
728 730 symbol = self.shell.user_ns.get(base_symbol_string, None)
729 731 if symbol is None:
730 732 symbol = __builtin__.__dict__.get(base_symbol_string, None)
731 733 if symbol is None:
732 734 return None, context
733 735
734 736 context = context[1:]
735 737 for i, name in enumerate(context):
736 738 new_symbol = getattr(symbol, name, None)
737 739 if new_symbol is None:
738 740 return symbol, context[i:]
739 741 else:
740 742 symbol = new_symbol
741 743
742 744 return symbol, []
743 745
744 746 def _at_shutdown(self):
745 747 """Actions taken at shutdown by the kernel, called by python's atexit.
746 748 """
747 749 # io.rprint("Kernel at_shutdown") # dbg
748 750 if self._shutdown_message is not None:
749 751 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
750 752 self.log.debug("%s", self._shutdown_message)
751 753 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
752 754
753 755 #-----------------------------------------------------------------------------
754 756 # Aliases and Flags for the IPKernelApp
755 757 #-----------------------------------------------------------------------------
756 758
757 759 flags = dict(kernel_flags)
758 760 flags.update(shell_flags)
759 761
760 762 addflag = lambda *args: flags.update(boolean_flag(*args))
761 763
762 764 flags['pylab'] = (
763 765 {'IPKernelApp' : {'pylab' : 'auto'}},
764 766 """Pre-load matplotlib and numpy for interactive use with
765 767 the default matplotlib backend."""
766 768 )
767 769
768 770 aliases = dict(kernel_aliases)
769 771 aliases.update(shell_aliases)
770 772
771 773 # it's possible we don't want short aliases for *all* of these:
772 774 aliases.update(dict(
773 775 pylab='IPKernelApp.pylab',
774 776 ))
775 777
776 778 #-----------------------------------------------------------------------------
777 779 # The IPKernelApp class
778 780 #-----------------------------------------------------------------------------
779 781
780 782 class IPKernelApp(KernelApp, InteractiveShellApp):
781 783 name = 'ipkernel'
782 784
783 785 aliases = Dict(aliases)
784 786 flags = Dict(flags)
785 787 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
786 788
787 789 # configurables
788 790 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
789 791 config=True,
790 792 help="""Pre-load matplotlib and numpy for interactive use,
791 793 selecting a particular matplotlib backend and loop integration.
792 794 """
793 795 )
794 796
795 797 @catch_config_error
796 798 def initialize(self, argv=None):
797 799 super(IPKernelApp, self).initialize(argv)
798 800 self.init_path()
799 801 self.init_shell()
800 802 self.init_extensions()
801 803 self.init_code()
802 804
803 805 def init_kernel(self):
804 806
805 807 shell_stream = ZMQStream(self.shell_socket)
806 808
807 809 kernel = Kernel(config=self.config, session=self.session,
808 810 shell_streams=[shell_stream],
809 811 iopub_socket=self.iopub_socket,
810 812 stdin_socket=self.stdin_socket,
811 813 log=self.log,
812 814 profile_dir=self.profile_dir,
813 815 )
814 816 self.kernel = kernel
815 817 kernel.record_ports(self.ports)
816 818 shell = kernel.shell
817 819 if self.pylab:
818 820 try:
819 821 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
820 822 shell.enable_pylab(gui, import_all=self.pylab_import_all)
821 823 except Exception:
822 824 self.log.error("Pylab initialization failed", exc_info=True)
823 825 # print exception straight to stdout, because normally
824 826 # _showtraceback associates the reply with an execution,
825 827 # which means frontends will never draw it, as this exception
826 828 # is not associated with any execute request.
827 829
828 830 # replace pyerr-sending traceback with stdout
829 831 _showtraceback = shell._showtraceback
830 832 def print_tb(etype, evalue, stb):
831 833 print ("Error initializing pylab, pylab mode will not "
832 834 "be active", file=io.stderr)
833 835 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
834 836 shell._showtraceback = print_tb
835 837
836 838 # send the traceback over stdout
837 839 shell.showtraceback(tb_offset=0)
838 840
839 841 # restore proper _showtraceback method
840 842 shell._showtraceback = _showtraceback
841 843
842 844
843 845 def init_shell(self):
844 846 self.shell = self.kernel.shell
845 847 self.shell.configurables.append(self)
846 848
847 849
848 850 #-----------------------------------------------------------------------------
849 851 # Kernel main and launch functions
850 852 #-----------------------------------------------------------------------------
851 853
852 854 def launch_kernel(*args, **kwargs):
853 855 """Launches a localhost IPython kernel, binding to the specified ports.
854 856
855 857 This function simply calls entry_point.base_launch_kernel with the right
856 858 first command to start an ipkernel. See base_launch_kernel for arguments.
857 859
858 860 Returns
859 861 -------
860 862 A tuple of form:
861 863 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
862 864 where kernel_process is a Popen object and the ports are integers.
863 865 """
864 866 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
865 867 *args, **kwargs)
866 868
867 869
868 870 def embed_kernel(module=None, local_ns=None, **kwargs):
869 871 """Embed and start an IPython kernel in a given scope.
870 872
871 873 Parameters
872 874 ----------
873 875 module : ModuleType, optional
874 876 The module to load into IPython globals (default: caller)
875 877 local_ns : dict, optional
876 878 The namespace to load into IPython user namespace (default: caller)
877 879
878 880 kwargs : various, optional
879 881 Further keyword args are relayed to the KernelApp constructor,
880 882 allowing configuration of the Kernel. Will only have an effect
881 883 on the first embed_kernel call for a given process.
882 884
883 885 """
884 886 # get the app if it exists, or set it up if it doesn't
885 887 if IPKernelApp.initialized():
886 888 app = IPKernelApp.instance()
887 889 else:
888 890 app = IPKernelApp.instance(**kwargs)
889 891 app.initialize([])
890 892 # Undo unnecessary sys module mangling from init_sys_modules.
891 893 # This would not be necessary if we could prevent it
892 894 # in the first place by using a different InteractiveShell
893 895 # subclass, as in the regular embed case.
894 896 main = app.kernel.shell._orig_sys_modules_main_mod
895 897 if main is not None:
896 898 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
897 899
898 900 # load the calling scope if not given
899 901 (caller_module, caller_locals) = extract_module_locals(1)
900 902 if module is None:
901 903 module = caller_module
902 904 if local_ns is None:
903 905 local_ns = caller_locals
904 906
905 907 app.kernel.user_module = module
906 908 app.kernel.user_ns = local_ns
907 909 app.start()
908 910
909 911 def main():
910 912 """Run an IPKernel as an application"""
911 913 app = IPKernelApp.instance()
912 914 app.initialize()
913 915 app.start()
914 916
915 917
916 918 if __name__ == '__main__':
917 919 main()
@@ -1,540 +1,541 b''
1 1 """A ZMQ-based subclass of InteractiveShell.
2 2
3 3 This code is meant to ease the refactoring of the base InteractiveShell into
4 4 something with a cleaner architecture for 2-process use, without actually
5 5 breaking InteractiveShell itself. So we're doing something a bit ugly, where
6 6 we subclass and override what we want to fix. Once this is working well, we
7 7 can go back to the base class and refactor the code for a cleaner inheritance
8 8 implementation that doesn't rely on so much monkeypatching.
9 9
10 10 But this lets us maintain a fully working IPython as we develop the new
11 11 machinery. This should thus be thought of as scaffolding.
12 12 """
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Stdlib
19 19 import inspect
20 20 import os
21 21 import sys
22 22 import time
23 23 from subprocess import Popen, PIPE
24 24
25 25 # System library imports
26 26 from zmq.eventloop import ioloop
27 27
28 28 # Our own
29 29 from IPython.core.interactiveshell import (
30 30 InteractiveShell, InteractiveShellABC
31 31 )
32 32 from IPython.core import page, pylabtools
33 33 from IPython.core.autocall import ZMQExitAutocall
34 34 from IPython.core.displaypub import DisplayPublisher
35 35 from IPython.core.macro import Macro
36 36 from IPython.core.magic import MacroToEdit
37 37 from IPython.core.payloadpage import install_payload_page
38 38 from IPython.lib.kernel import (
39 39 get_connection_file, get_connection_info, connect_qtconsole
40 40 )
41 41 from IPython.testing.skipdoctest import skip_doctest
42 42 from IPython.utils import io
43 43 from IPython.utils.jsonutil import json_clean
44 44 from IPython.utils.path import get_py_filename
45 45 from IPython.utils.process import arg_split
46 from IPython.utils.traitlets import Instance, Type, Dict, CBool
46 from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes
47 47 from IPython.utils.warn import warn, error
48 48 from IPython.zmq.displayhook import ZMQShellDisplayHook, _encode_binary
49 49 from IPython.zmq.session import extract_header
50 50 from session import Session
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Functions and classes
55 55 #-----------------------------------------------------------------------------
56 56
57 57 class ZMQDisplayPublisher(DisplayPublisher):
58 58 """A display publisher that publishes data using a ZeroMQ PUB socket."""
59 59
60 60 session = Instance(Session)
61 61 pub_socket = Instance('zmq.Socket')
62 62 parent_header = Dict({})
63 topic = CBytes(b'displaypub')
63 64
64 65 def set_parent(self, parent):
65 66 """Set the parent for outbound messages."""
66 67 self.parent_header = extract_header(parent)
67 68
68 69 def _flush_streams(self):
69 70 """flush IO Streams prior to display"""
70 71 sys.stdout.flush()
71 72 sys.stderr.flush()
72 73
73 74 def publish(self, source, data, metadata=None):
74 75 self._flush_streams()
75 76 if metadata is None:
76 77 metadata = {}
77 78 self._validate_data(source, data, metadata)
78 79 content = {}
79 80 content['source'] = source
80 81 _encode_binary(data)
81 82 content['data'] = data
82 83 content['metadata'] = metadata
83 84 self.session.send(
84 85 self.pub_socket, u'display_data', json_clean(content),
85 parent=self.parent_header
86 parent=self.parent_header, ident=self.topic,
86 87 )
87 88
88 89 def clear_output(self, stdout=True, stderr=True, other=True):
89 90 content = dict(stdout=stdout, stderr=stderr, other=other)
90 91
91 92 if stdout:
92 93 print('\r', file=sys.stdout, end='')
93 94 if stderr:
94 95 print('\r', file=sys.stderr, end='')
95 96
96 97 self._flush_streams()
97 98
98 99 self.session.send(
99 100 self.pub_socket, u'clear_output', content,
100 parent=self.parent_header
101 parent=self.parent_header, ident=self.topic,
101 102 )
102 103
103 104 class ZMQInteractiveShell(InteractiveShell):
104 105 """A subclass of InteractiveShell for ZMQ."""
105 106
106 107 displayhook_class = Type(ZMQShellDisplayHook)
107 108 display_pub_class = Type(ZMQDisplayPublisher)
108 109
109 110 # Override the traitlet in the parent class, because there's no point using
110 111 # readline for the kernel. Can be removed when the readline code is moved
111 112 # to the terminal frontend.
112 113 colors_force = CBool(True)
113 114 readline_use = CBool(False)
114 115 # autoindent has no meaning in a zmqshell, and attempting to enable it
115 116 # will print a warning in the absence of readline.
116 117 autoindent = CBool(False)
117 118
118 119 exiter = Instance(ZMQExitAutocall)
119 120 def _exiter_default(self):
120 121 return ZMQExitAutocall(self)
121 122
122 123 def _exit_now_changed(self, name, old, new):
123 124 """stop eventloop when exit_now fires"""
124 125 if new:
125 126 loop = ioloop.IOLoop.instance()
126 127 loop.add_timeout(time.time()+0.1, loop.stop)
127 128
128 129 keepkernel_on_exit = None
129 130
130 131 # Over ZeroMQ, GUI control isn't done with PyOS_InputHook as there is no
131 132 # interactive input being read; we provide event loop support in ipkernel
132 133 from .eventloops import enable_gui
133 134 enable_gui = staticmethod(enable_gui)
134 135
135 136 def init_environment(self):
136 137 """Configure the user's environment.
137 138
138 139 """
139 140 env = os.environ
140 141 # These two ensure 'ls' produces nice coloring on BSD-derived systems
141 142 env['TERM'] = 'xterm-color'
142 143 env['CLICOLOR'] = '1'
143 144 # Since normal pagers don't work at all (over pexpect we don't have
144 145 # single-key control of the subprocess), try to disable paging in
145 146 # subprocesses as much as possible.
146 147 env['PAGER'] = 'cat'
147 148 env['GIT_PAGER'] = 'cat'
148 149
149 150 # And install the payload version of page.
150 151 install_payload_page()
151 152
152 153 def auto_rewrite_input(self, cmd):
153 154 """Called to show the auto-rewritten input for autocall and friends.
154 155
155 156 FIXME: this payload is currently not correctly processed by the
156 157 frontend.
157 158 """
158 159 new = self.prompt_manager.render('rewrite') + cmd
159 160 payload = dict(
160 161 source='IPython.zmq.zmqshell.ZMQInteractiveShell.auto_rewrite_input',
161 162 transformed_input=new,
162 163 )
163 164 self.payload_manager.write_payload(payload)
164 165
165 166 def ask_exit(self):
166 167 """Engage the exit actions."""
167 168 self.exit_now = True
168 169 payload = dict(
169 170 source='IPython.zmq.zmqshell.ZMQInteractiveShell.ask_exit',
170 171 exit=True,
171 172 keepkernel=self.keepkernel_on_exit,
172 173 )
173 174 self.payload_manager.write_payload(payload)
174 175
175 176 def _showtraceback(self, etype, evalue, stb):
176 177
177 178 exc_content = {
178 179 u'traceback' : stb,
179 180 u'ename' : unicode(etype.__name__),
180 181 u'evalue' : unicode(evalue)
181 182 }
182 183
183 184 dh = self.displayhook
184 185 # Send exception info over pub socket for other clients than the caller
185 186 # to pick up
186 187 topic = None
187 188 if dh.topic:
188 189 topic = dh.topic.replace(b'pyout', b'pyerr')
189 190
190 191 exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header, ident=topic)
191 192
192 193 # FIXME - Hack: store exception info in shell object. Right now, the
193 194 # caller is reading this info after the fact, we need to fix this logic
194 195 # to remove this hack. Even uglier, we need to store the error status
195 196 # here, because in the main loop, the logic that sets it is being
196 197 # skipped because runlines swallows the exceptions.
197 198 exc_content[u'status'] = u'error'
198 199 self._reply_content = exc_content
199 200 # /FIXME
200 201
201 202 return exc_content
202 203
203 204 #------------------------------------------------------------------------
204 205 # Magic overrides
205 206 #------------------------------------------------------------------------
206 207 # Once the base class stops inheriting from magic, this code needs to be
207 208 # moved into a separate machinery as well. For now, at least isolate here
208 209 # the magics which this class needs to implement differently from the base
209 210 # class, or that are unique to it.
210 211
211 212 def magic_doctest_mode(self,parameter_s=''):
212 213 """Toggle doctest mode on and off.
213 214
214 215 This mode is intended to make IPython behave as much as possible like a
215 216 plain Python shell, from the perspective of how its prompts, exceptions
216 217 and output look. This makes it easy to copy and paste parts of a
217 218 session into doctests. It does so by:
218 219
219 220 - Changing the prompts to the classic ``>>>`` ones.
220 221 - Changing the exception reporting mode to 'Plain'.
221 222 - Disabling pretty-printing of output.
222 223
223 224 Note that IPython also supports the pasting of code snippets that have
224 225 leading '>>>' and '...' prompts in them. This means that you can paste
225 226 doctests from files or docstrings (even if they have leading
226 227 whitespace), and the code will execute correctly. You can then use
227 228 '%history -t' to see the translated history; this will give you the
228 229 input after removal of all the leading prompts and whitespace, which
229 230 can be pasted back into an editor.
230 231
231 232 With these features, you can switch into this mode easily whenever you
232 233 need to do testing and changes to doctests, without having to leave
233 234 your existing IPython session.
234 235 """
235 236
236 237 from IPython.utils.ipstruct import Struct
237 238
238 239 # Shorthands
239 240 shell = self.shell
240 241 disp_formatter = self.shell.display_formatter
241 242 ptformatter = disp_formatter.formatters['text/plain']
242 243 # dstore is a data store kept in the instance metadata bag to track any
243 244 # changes we make, so we can undo them later.
244 245 dstore = shell.meta.setdefault('doctest_mode', Struct())
245 246 save_dstore = dstore.setdefault
246 247
247 248 # save a few values we'll need to recover later
248 249 mode = save_dstore('mode', False)
249 250 save_dstore('rc_pprint', ptformatter.pprint)
250 251 save_dstore('rc_plain_text_only',disp_formatter.plain_text_only)
251 252 save_dstore('xmode', shell.InteractiveTB.mode)
252 253
253 254 if mode == False:
254 255 # turn on
255 256 ptformatter.pprint = False
256 257 disp_formatter.plain_text_only = True
257 258 shell.magic_xmode('Plain')
258 259 else:
259 260 # turn off
260 261 ptformatter.pprint = dstore.rc_pprint
261 262 disp_formatter.plain_text_only = dstore.rc_plain_text_only
262 263 shell.magic_xmode(dstore.xmode)
263 264
264 265 # Store new mode and inform on console
265 266 dstore.mode = bool(1-int(mode))
266 267 mode_label = ['OFF','ON'][dstore.mode]
267 268 print('Doctest mode is:', mode_label)
268 269
269 270 # Send the payload back so that clients can modify their prompt display
270 271 payload = dict(
271 272 source='IPython.zmq.zmqshell.ZMQInteractiveShell.magic_doctest_mode',
272 273 mode=dstore.mode)
273 274 self.payload_manager.write_payload(payload)
274 275
275 276 @skip_doctest
276 277 def magic_edit(self,parameter_s='',last_call=['','']):
277 278 """Bring up an editor and execute the resulting code.
278 279
279 280 Usage:
280 281 %edit [options] [args]
281 282
282 283 %edit runs an external text editor. You will need to set the command for
283 284 this editor via the ``TerminalInteractiveShell.editor`` option in your
284 285 configuration file before it will work.
285 286
286 287 This command allows you to conveniently edit multi-line code right in
287 288 your IPython session.
288 289
289 290 If called without arguments, %edit opens up an empty editor with a
290 291 temporary file and will execute the contents of this file when you
291 292 close it (don't forget to save it!).
292 293
293 294
294 295 Options:
295 296
296 297 -n <number>: open the editor at a specified line number. By default,
297 298 the IPython editor hook uses the unix syntax 'editor +N filename', but
298 299 you can configure this by providing your own modified hook if your
299 300 favorite editor supports line-number specifications with a different
300 301 syntax.
301 302
302 303 -p: this will call the editor with the same data as the previous time
303 304 it was used, regardless of how long ago (in your current session) it
304 305 was.
305 306
306 307 -r: use 'raw' input. This option only applies to input taken from the
307 308 user's history. By default, the 'processed' history is used, so that
308 309 magics are loaded in their transformed version to valid Python. If
309 310 this option is given, the raw input as typed as the command line is
310 311 used instead. When you exit the editor, it will be executed by
311 312 IPython's own processor.
312 313
313 314 -x: do not execute the edited code immediately upon exit. This is
314 315 mainly useful if you are editing programs which need to be called with
315 316 command line arguments, which you can then do using %run.
316 317
317 318
318 319 Arguments:
319 320
320 321 If arguments are given, the following possibilites exist:
321 322
322 323 - The arguments are numbers or pairs of colon-separated numbers (like
323 324 1 4:8 9). These are interpreted as lines of previous input to be
324 325 loaded into the editor. The syntax is the same of the %macro command.
325 326
326 327 - If the argument doesn't start with a number, it is evaluated as a
327 328 variable and its contents loaded into the editor. You can thus edit
328 329 any string which contains python code (including the result of
329 330 previous edits).
330 331
331 332 - If the argument is the name of an object (other than a string),
332 333 IPython will try to locate the file where it was defined and open the
333 334 editor at the point where it is defined. You can use `%edit function`
334 335 to load an editor exactly at the point where 'function' is defined,
335 336 edit it and have the file be executed automatically.
336 337
337 338 If the object is a macro (see %macro for details), this opens up your
338 339 specified editor with a temporary file containing the macro's data.
339 340 Upon exit, the macro is reloaded with the contents of the file.
340 341
341 342 Note: opening at an exact line is only supported under Unix, and some
342 343 editors (like kedit and gedit up to Gnome 2.8) do not understand the
343 344 '+NUMBER' parameter necessary for this feature. Good editors like
344 345 (X)Emacs, vi, jed, pico and joe all do.
345 346
346 347 - If the argument is not found as a variable, IPython will look for a
347 348 file with that name (adding .py if necessary) and load it into the
348 349 editor. It will execute its contents with execfile() when you exit,
349 350 loading any code in the file into your interactive namespace.
350 351
351 352 After executing your code, %edit will return as output the code you
352 353 typed in the editor (except when it was an existing file). This way
353 354 you can reload the code in further invocations of %edit as a variable,
354 355 via _<NUMBER> or Out[<NUMBER>], where <NUMBER> is the prompt number of
355 356 the output.
356 357
357 358 Note that %edit is also available through the alias %ed.
358 359
359 360 This is an example of creating a simple function inside the editor and
360 361 then modifying it. First, start up the editor:
361 362
362 363 In [1]: ed
363 364 Editing... done. Executing edited code...
364 365 Out[1]: 'def foo():n print "foo() was defined in an editing session"n'
365 366
366 367 We can then call the function foo():
367 368
368 369 In [2]: foo()
369 370 foo() was defined in an editing session
370 371
371 372 Now we edit foo. IPython automatically loads the editor with the
372 373 (temporary) file where foo() was previously defined:
373 374
374 375 In [3]: ed foo
375 376 Editing... done. Executing edited code...
376 377
377 378 And if we call foo() again we get the modified version:
378 379
379 380 In [4]: foo()
380 381 foo() has now been changed!
381 382
382 383 Here is an example of how to edit a code snippet successive
383 384 times. First we call the editor:
384 385
385 386 In [5]: ed
386 387 Editing... done. Executing edited code...
387 388 hello
388 389 Out[5]: "print 'hello'n"
389 390
390 391 Now we call it again with the previous output (stored in _):
391 392
392 393 In [6]: ed _
393 394 Editing... done. Executing edited code...
394 395 hello world
395 396 Out[6]: "print 'hello world'n"
396 397
397 398 Now we call it with the output #8 (stored in _8, also as Out[8]):
398 399
399 400 In [7]: ed _8
400 401 Editing... done. Executing edited code...
401 402 hello again
402 403 Out[7]: "print 'hello again'n"
403 404 """
404 405
405 406 opts,args = self.parse_options(parameter_s,'prn:')
406 407
407 408 try:
408 409 filename, lineno, _ = self._find_edit_target(args, opts, last_call)
409 410 except MacroToEdit as e:
410 411 # TODO: Implement macro editing over 2 processes.
411 412 print("Macro editing not yet implemented in 2-process model.")
412 413 return
413 414
414 415 # Make sure we send to the client an absolute path, in case the working
415 416 # directory of client and kernel don't match
416 417 filename = os.path.abspath(filename)
417 418
418 419 payload = {
419 420 'source' : 'IPython.zmq.zmqshell.ZMQInteractiveShell.edit_magic',
420 421 'filename' : filename,
421 422 'line_number' : lineno
422 423 }
423 424 self.payload_manager.write_payload(payload)
424 425
425 426 # A few magics that are adapted to the specifics of using pexpect and a
426 427 # remote terminal
427 428
428 429 def magic_clear(self, arg_s):
429 430 """Clear the terminal."""
430 431 if os.name == 'posix':
431 432 self.shell.system("clear")
432 433 else:
433 434 self.shell.system("cls")
434 435
435 436 if os.name == 'nt':
436 437 # This is the usual name in windows
437 438 magic_cls = magic_clear
438 439
439 440 # Terminal pagers won't work over pexpect, but we do have our own pager
440 441
441 442 def magic_less(self, arg_s):
442 443 """Show a file through the pager.
443 444
444 445 Files ending in .py are syntax-highlighted."""
445 446 cont = open(arg_s).read()
446 447 if arg_s.endswith('.py'):
447 448 cont = self.shell.pycolorize(cont)
448 449 page.page(cont)
449 450
450 451 magic_more = magic_less
451 452
452 453 # Man calls a pager, so we also need to redefine it
453 454 if os.name == 'posix':
454 455 def magic_man(self, arg_s):
455 456 """Find the man page for the given command and display in pager."""
456 457 page.page(self.shell.getoutput('man %s | col -b' % arg_s,
457 458 split=False))
458 459
459 460 # FIXME: this is specific to the GUI, so we should let the gui app load
460 461 # magics at startup that are only for the gui. Once the gui app has proper
461 462 # profile and configuration management, we can have it initialize a kernel
462 463 # with a special config file that provides these.
463 464 def magic_guiref(self, arg_s):
464 465 """Show a basic reference about the GUI console."""
465 466 from IPython.core.usage import gui_reference
466 467 page.page(gui_reference, auto_html=True)
467 468
468 469 def magic_connect_info(self, arg_s):
469 470 """Print information for connecting other clients to this kernel
470 471
471 472 It will print the contents of this session's connection file, as well as
472 473 shortcuts for local clients.
473 474
474 475 In the simplest case, when called from the most recently launched kernel,
475 476 secondary clients can be connected, simply with:
476 477
477 478 $> ipython <app> --existing
478 479
479 480 """
480 481
481 482 from IPython.core.application import BaseIPythonApplication as BaseIPApp
482 483
483 484 if BaseIPApp.initialized():
484 485 app = BaseIPApp.instance()
485 486 security_dir = app.profile_dir.security_dir
486 487 profile = app.profile
487 488 else:
488 489 profile = 'default'
489 490 security_dir = ''
490 491
491 492 try:
492 493 connection_file = get_connection_file()
493 494 info = get_connection_info(unpack=False)
494 495 except Exception as e:
495 496 error("Could not get connection info: %r" % e)
496 497 return
497 498
498 499 # add profile flag for non-default profile
499 500 profile_flag = "--profile %s" % profile if profile != 'default' else ""
500 501
501 502 # if it's in the security dir, truncate to basename
502 503 if security_dir == os.path.dirname(connection_file):
503 504 connection_file = os.path.basename(connection_file)
504 505
505 506
506 507 print (info + '\n')
507 508 print ("Paste the above JSON into a file, and connect with:\n"
508 509 " $> ipython <app> --existing <file>\n"
509 510 "or, if you are local, you can connect with just:\n"
510 511 " $> ipython <app> --existing {0} {1}\n"
511 512 "or even just:\n"
512 513 " $> ipython <app> --existing {1}\n"
513 514 "if this is the most recent IPython session you have started.".format(
514 515 connection_file, profile_flag
515 516 )
516 517 )
517 518
518 519 def magic_qtconsole(self, arg_s):
519 520 """Open a qtconsole connected to this kernel.
520 521
521 522 Useful for connecting a qtconsole to running notebooks, for better
522 523 debugging.
523 524 """
524 525 try:
525 526 p = connect_qtconsole(argv=arg_split(arg_s, os.name=='posix'))
526 527 except Exception as e:
527 528 error("Could not start qtconsole: %r" % e)
528 529 return
529 530
530 531 def set_next_input(self, text):
531 532 """Send the specified text to the frontend to be presented at the next
532 533 input cell."""
533 534 payload = dict(
534 535 source='IPython.zmq.zmqshell.ZMQInteractiveShell.set_next_input',
535 536 text=text
536 537 )
537 538 self.payload_manager.write_payload(payload)
538 539
539 540
540 541 InteractiveShellABC.register(ZMQInteractiveShell)
General Comments 0
You need to be logged in to leave comments. Login now