##// END OF EJS Templates
Separate IPython kernel implementation from kernel machinery
Thomas Kluyver -
Show More
@@ -1,4 +1,286
1 """The IPython kernel implementation"""
2
3 import getpass
4 import sys
5 import traceback
6
7 from IPython.core import release
8 from IPython.utils.py3compat import builtin_mod, PY3
9 from IPython.utils.tokenutil import token_at_cursor
10 from IPython.utils.traitlets import Instance, Type, Any
11
1 from .kernelbase import KernelBase
12 from .kernelbase import KernelBase
13 from .serialize import serialize_object, unpack_apply_message
14 from .zmqshell import ZMQInteractiveShell
2
15
3 class Kernel(KernelBase):
16 class Kernel(KernelBase):
17 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
18 shell_class = Type(ZMQInteractiveShell)
19
20 user_module = Any()
21 def _user_module_changed(self, name, old, new):
22 if self.shell is not None:
23 self.shell.user_module = new
24
25 user_ns = Instance(dict, args=None, allow_none=True)
26 def _user_ns_changed(self, name, old, new):
27 if self.shell is not None:
28 self.shell.user_ns = new
29 self.shell.init_user_ns()
30
31 # A reference to the Python builtin 'raw_input' function.
32 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
33 _sys_raw_input = Any()
34 _sys_eval_input = Any()
35
36 def __init__(self, **kwargs):
37 super(Kernel, self).__init__(**kwargs)
38
39 # Initialize the InteractiveShell subclass
40 self.shell = self.shell_class.instance(parent=self,
41 profile_dir = self.profile_dir,
42 user_module = self.user_module,
43 user_ns = self.user_ns,
44 kernel = self,
45 )
46 self.shell.displayhook.session = self.session
47 self.shell.displayhook.pub_socket = self.iopub_socket
48 self.shell.displayhook.topic = self._topic('execute_result')
49 self.shell.display_pub.session = self.session
50 self.shell.display_pub.pub_socket = self.iopub_socket
51 self.shell.data_pub.session = self.session
52 self.shell.data_pub.pub_socket = self.iopub_socket
53
54 # TMP - hack while developing
55 self.shell._reply_content = None
56
57 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
58 comm_manager = self.shell.comm_manager
59 for msg_type in comm_msg_types:
60 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
61
62 # Kernel info fields
63 implementation = 'ipython'
64 implementation_version = release.version
65 language = 'python'
66 language_version = sys.version.split()[0]
67 @property
68 def banner(self):
69 return self.shell.banner
70
71 def set_parent(self, ident, parent):
72 """Overridden from parent to tell the display hook and output streams
73 about the parent message.
74 """
75 super(Kernel, self).set_parent(ident, parent)
76 self.shell.set_parent(parent)
77
78 def _forward_input(self, allow_stdin=False):
79 """Forward raw_input and getpass to the current frontend.
80
81 via input_request
82 """
83 self._allow_stdin = allow_stdin
84
85 if PY3:
86 self._sys_raw_input = builtin_mod.input
87 builtin_mod.input = self.raw_input
88 else:
89 self._sys_raw_input = builtin_mod.raw_input
90 self._sys_eval_input = builtin_mod.input
91 builtin_mod.raw_input = self.raw_input
92 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
93 self._save_getpass = getpass.getpass
94 getpass.getpass = self.getpass
95
96 def _restore_input(self):
97 """Restore raw_input, getpass"""
98 if PY3:
99 builtin_mod.input = self._sys_raw_input
100 else:
101 builtin_mod.raw_input = self._sys_raw_input
102 builtin_mod.input = self._sys_eval_input
103
104 getpass.getpass = self._save_getpass
105
106 @property
107 def execution_count(self):
108 return self.shell.execution_count
109
110 @execution_count.setter
111 def execution_count(self, value):
112 # Ignore the incrememnting done by KernelBase, in favour of our shell's
113 # execution counter.
4 pass
114 pass
115
116 def do_execute(self, code, silent, store_history=True,
117 user_expressions=None, allow_stdin=False):
118 shell = self.shell # we'll need this a lot here
119
120 self._forward_input(allow_stdin)
121
122 reply_content = {}
123 # FIXME: the shell calls the exception handler itself.
124 shell._reply_content = None
125 try:
126 shell.run_cell(code, store_history=store_history, silent=silent)
127 except:
128 status = u'error'
129 # FIXME: this code right now isn't being used yet by default,
130 # because the run_cell() call above directly fires off exception
131 # reporting. This code, therefore, is only active in the scenario
132 # where runlines itself has an unhandled exception. We need to
133 # uniformize this, for all exception construction to come from a
134 # single location in the codbase.
135 etype, evalue, tb = sys.exc_info()
136 tb_list = traceback.format_exception(etype, evalue, tb)
137 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
138 else:
139 status = u'ok'
140 finally:
141 self._restore_input()
142
143 reply_content[u'status'] = status
144
145 # Return the execution counter so clients can display prompts
146 reply_content['execution_count'] = shell.execution_count - 1
147
148 # FIXME - fish exception info out of shell, possibly left there by
149 # runlines. We'll need to clean up this logic later.
150 if shell._reply_content is not None:
151 reply_content.update(shell._reply_content)
152 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
153 reply_content['engine_info'] = e_info
154 # reset after use
155 shell._reply_content = None
156
157 if 'traceback' in reply_content:
158 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
159
160
161 # At this point, we can tell whether the main code execution succeeded
162 # or not. If it did, we proceed to evaluate user_expressions
163 if reply_content['status'] == 'ok':
164 reply_content[u'user_expressions'] = \
165 shell.user_expressions(user_expressions or {})
166 else:
167 # If there was an error, don't even try to compute expressions
168 reply_content[u'user_expressions'] = {}
169
170 # Payloads should be retrieved regardless of outcome, so we can both
171 # recover partial output (that could have been generated early in a
172 # block, before an error) and clear the payload system always.
173 reply_content[u'payload'] = shell.payload_manager.read_payload()
174 # Be agressive about clearing the payload because we don't want
175 # it to sit in memory until the next execute_request comes in.
176 shell.payload_manager.clear_payload()
177
178 return reply_content
179
180 def do_complete(self, code, cursor_pos):
181 txt, matches = self.shell.complete('', code, cursor_pos)
182 return {'matches' : matches,
183 'cursor_end' : cursor_pos,
184 'cursor_start' : cursor_pos - len(txt),
185 'metadata' : {},
186 'status' : 'ok'}
187
188 def do_inspect(self, code, cursor_pos, detail_level=0):
189 name = token_at_cursor(code, cursor_pos)
190 info = self.shell.object_inspect(name)
191
192 reply_content = {'status' : 'ok'}
193 reply_content['data'] = data = {}
194 reply_content['metadata'] = {}
195 reply_content['found'] = info['found']
196 if info['found']:
197 info_text = self.shell.object_inspect_text(
198 name,
199 detail_level=detail_level,
200 )
201 data['text/plain'] = info_text
202
203 return reply_content
204
205 def do_history(self, hist_access_type, output, raw, session=None, start=None,
206 stop=None, n=None, pattern=None, unique=False):
207 if hist_access_type == 'tail':
208 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
209 include_latest=True)
210
211 elif hist_access_type == 'range':
212 hist = self.shell.history_manager.get_range(session, start, stop,
213 raw=raw, output=output)
214
215 elif hist_access_type == 'search':
216 hist = self.shell.history_manager.search(
217 pattern, raw=raw, output=output, n=n, unique=unique)
218 else:
219 hist = []
220
221 return {'history' : list(hist)}
222
223 def do_shutdown(self, restart):
224 self.shell.exit_now = True
225 return dict(status='ok', restart=restart)
226
227 def do_apply(self, content, bufs, msg_id, reply_metadata):
228 shell = self.shell
229 try:
230 working = shell.user_ns
231
232 prefix = "_"+str(msg_id).replace("-","")+"_"
233
234 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
235
236 fname = getattr(f, '__name__', 'f')
237
238 fname = prefix+"f"
239 argname = prefix+"args"
240 kwargname = prefix+"kwargs"
241 resultname = prefix+"result"
242
243 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
244 # print ns
245 working.update(ns)
246 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
247 try:
248 exec(code, shell.user_global_ns, shell.user_ns)
249 result = working.get(resultname)
250 finally:
251 for key in ns:
252 working.pop(key)
253
254 result_buf = serialize_object(result,
255 buffer_threshold=self.session.buffer_threshold,
256 item_threshold=self.session.item_threshold,
257 )
258
259 except:
260 # invoke IPython traceback formatting
261 shell.showtraceback()
262 # FIXME - fish exception info out of shell, possibly left there by
263 # run_code. We'll need to clean up this logic later.
264 reply_content = {}
265 if shell._reply_content is not None:
266 reply_content.update(shell._reply_content)
267 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
268 reply_content['engine_info'] = e_info
269 # reset after use
270 shell._reply_content = None
271
272 self.send_response(self.iopub_socket, u'error', reply_content,
273 ident=self._topic('error'))
274 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
275 result_buf = []
276
277 if reply_content['ename'] == 'UnmetDependency':
278 reply_metadata['dependencies_met'] = False
279 else:
280 reply_content = {'status' : 'ok'}
281
282 return reply_content, result_buf
283
284 def do_clear(self):
285 self.shell.reset(False)
286 return dict(status='ok')
@@ -1,854 +1,646
1 """An interactive kernel that talks to frontends over 0MQ."""
1 """Base class for a kernel that talks to frontends over 0MQ."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import getpass
9 import sys
8 import sys
10 import time
9 import time
11 import traceback
12 import logging
10 import logging
13 import uuid
11 import uuid
14
12
15 from datetime import datetime
13 from datetime import datetime
16 from signal import (
14 from signal import (
17 signal, default_int_handler, SIGINT
15 signal, default_int_handler, SIGINT
18 )
16 )
19
17
20 import zmq
18 import zmq
21 from zmq.eventloop import ioloop
19 from zmq.eventloop import ioloop
22 from zmq.eventloop.zmqstream import ZMQStream
20 from zmq.eventloop.zmqstream import ZMQStream
23
21
24 from IPython.config.configurable import Configurable
22 from IPython.config.configurable import Configurable
25 from IPython.core.error import StdinNotImplementedError
23 from IPython.core.error import StdinNotImplementedError
26 from IPython.core import release
24 from IPython.core import release
27 from IPython.utils import py3compat
25 from IPython.utils import py3compat
28 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
26 from IPython.utils.py3compat import unicode_type, string_types
29 from IPython.utils.jsonutil import json_clean
27 from IPython.utils.jsonutil import json_clean
30 from IPython.utils.tokenutil import token_at_cursor
31 from IPython.utils.traitlets import (
28 from IPython.utils.traitlets import (
32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
29 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
33 Type, Bool,
34 )
30 )
35
31
36 from .serialize import serialize_object, unpack_apply_message
37 from .session import Session
32 from .session import Session
38 from .zmqshell import ZMQInteractiveShell
39
33
40
34
41 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
42 # Main kernel class
36 # Main kernel class
43 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
44
38
45 protocol_version = release.kernel_protocol_version
46 ipython_version = release.version
47 language_version = sys.version.split()[0]
48
49
50 class KernelBase(Configurable):
39 class KernelBase(Configurable):
51
40
52 #---------------------------------------------------------------------------
41 #---------------------------------------------------------------------------
53 # Kernel interface
42 # Kernel interface
54 #---------------------------------------------------------------------------
43 #---------------------------------------------------------------------------
55
44
56 # attribute to override with a GUI
45 # attribute to override with a GUI
57 eventloop = Any(None)
46 eventloop = Any(None)
58 def _eventloop_changed(self, name, old, new):
47 def _eventloop_changed(self, name, old, new):
59 """schedule call to eventloop from IOLoop"""
48 """schedule call to eventloop from IOLoop"""
60 loop = ioloop.IOLoop.instance()
49 loop = ioloop.IOLoop.instance()
61 loop.add_callback(self.enter_eventloop)
50 loop.add_callback(self.enter_eventloop)
62
51
63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
64 shell_class = Type(ZMQInteractiveShell)
65
66 session = Instance(Session)
52 session = Instance(Session)
67 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
53 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
68 shell_streams = List()
54 shell_streams = List()
69 control_stream = Instance(ZMQStream)
55 control_stream = Instance(ZMQStream)
70 iopub_socket = Instance(zmq.Socket)
56 iopub_socket = Instance(zmq.Socket)
71 stdin_socket = Instance(zmq.Socket)
57 stdin_socket = Instance(zmq.Socket)
72 log = Instance(logging.Logger)
58 log = Instance(logging.Logger)
73
59
74 user_module = Any()
75 def _user_module_changed(self, name, old, new):
76 if self.shell is not None:
77 self.shell.user_module = new
78
79 user_ns = Instance(dict, args=None, allow_none=True)
80 def _user_ns_changed(self, name, old, new):
81 if self.shell is not None:
82 self.shell.user_ns = new
83 self.shell.init_user_ns()
84
85 # identities:
60 # identities:
86 int_id = Integer(-1)
61 int_id = Integer(-1)
87 ident = Unicode()
62 ident = Unicode()
88
63
89 def _ident_default(self):
64 def _ident_default(self):
90 return unicode_type(uuid.uuid4())
65 return unicode_type(uuid.uuid4())
91
66
92 # Private interface
67 # Private interface
93
68
94 _darwin_app_nap = Bool(True, config=True,
69 _darwin_app_nap = Bool(True, config=True,
95 help="""Whether to use appnope for compatiblity with OS X App Nap.
70 help="""Whether to use appnope for compatiblity with OS X App Nap.
96
71
97 Only affects OS X >= 10.9.
72 Only affects OS X >= 10.9.
98 """
73 """
99 )
74 )
100
75
101 # track associations with current request
76 # track associations with current request
102 _allow_stdin = Bool(False)
77 _allow_stdin = Bool(False)
103 _parent_header = Dict()
78 _parent_header = Dict()
104 _parent_ident = Any(b'')
79 _parent_ident = Any(b'')
105 # Time to sleep after flushing the stdout/err buffers in each execute
80 # Time to sleep after flushing the stdout/err buffers in each execute
106 # cycle. While this introduces a hard limit on the minimal latency of the
81 # cycle. While this introduces a hard limit on the minimal latency of the
107 # execute cycle, it helps prevent output synchronization problems for
82 # execute cycle, it helps prevent output synchronization problems for
108 # clients.
83 # clients.
109 # Units are in seconds. The minimum zmq latency on local host is probably
84 # Units are in seconds. The minimum zmq latency on local host is probably
110 # ~150 microseconds, set this to 500us for now. We may need to increase it
85 # ~150 microseconds, set this to 500us for now. We may need to increase it
111 # a little if it's not enough after more interactive testing.
86 # a little if it's not enough after more interactive testing.
112 _execute_sleep = Float(0.0005, config=True)
87 _execute_sleep = Float(0.0005, config=True)
113
88
114 # Frequency of the kernel's event loop.
89 # Frequency of the kernel's event loop.
115 # Units are in seconds, kernel subclasses for GUI toolkits may need to
90 # Units are in seconds, kernel subclasses for GUI toolkits may need to
116 # adapt to milliseconds.
91 # adapt to milliseconds.
117 _poll_interval = Float(0.05, config=True)
92 _poll_interval = Float(0.05, config=True)
118
93
119 # If the shutdown was requested over the network, we leave here the
94 # If the shutdown was requested over the network, we leave here the
120 # necessary reply message so it can be sent by our registered atexit
95 # necessary reply message so it can be sent by our registered atexit
121 # handler. This ensures that the reply is only sent to clients truly at
96 # handler. This ensures that the reply is only sent to clients truly at
122 # the end of our shutdown process (which happens after the underlying
97 # the end of our shutdown process (which happens after the underlying
123 # IPython shell's own shutdown).
98 # IPython shell's own shutdown).
124 _shutdown_message = None
99 _shutdown_message = None
125
100
126 # This is a dict of port number that the kernel is listening on. It is set
101 # This is a dict of port number that the kernel is listening on. It is set
127 # by record_ports and used by connect_request.
102 # by record_ports and used by connect_request.
128 _recorded_ports = Dict()
103 _recorded_ports = Dict()
129
104
130 # A reference to the Python builtin 'raw_input' function.
131 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
132 _sys_raw_input = Any()
133 _sys_eval_input = Any()
134
135 # set of aborted msg_ids
105 # set of aborted msg_ids
136 aborted = Set()
106 aborted = Set()
137
107
108 # Track execution count here. For IPython, we override this to use the
109 # execution count we store in the shell.
110 execution_count = 0
111
138
112
139 def __init__(self, **kwargs):
113 def __init__(self, **kwargs):
140 super(KernelBase, self).__init__(**kwargs)
114 super(KernelBase, self).__init__(**kwargs)
141
115
142 # Initialize the InteractiveShell subclass
143 self.shell = self.shell_class.instance(parent=self,
144 profile_dir = self.profile_dir,
145 user_module = self.user_module,
146 user_ns = self.user_ns,
147 kernel = self,
148 )
149 self.shell.displayhook.session = self.session
150 self.shell.displayhook.pub_socket = self.iopub_socket
151 self.shell.displayhook.topic = self._topic('execute_result')
152 self.shell.display_pub.session = self.session
153 self.shell.display_pub.pub_socket = self.iopub_socket
154 self.shell.data_pub.session = self.session
155 self.shell.data_pub.pub_socket = self.iopub_socket
156
157 # TMP - hack while developing
158 self.shell._reply_content = None
159
160 # Build dict of handlers for message types
116 # Build dict of handlers for message types
161 msg_types = [ 'execute_request', 'complete_request',
117 msg_types = [ 'execute_request', 'complete_request',
162 'inspect_request', 'history_request',
118 'inspect_request', 'history_request',
163 'kernel_info_request',
119 'kernel_info_request',
164 'connect_request', 'shutdown_request',
120 'connect_request', 'shutdown_request',
165 'apply_request',
121 'apply_request',
166 ]
122 ]
167 self.shell_handlers = {}
123 self.shell_handlers = {}
168 for msg_type in msg_types:
124 for msg_type in msg_types:
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
125 self.shell_handlers[msg_type] = getattr(self, msg_type)
170
126
171 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
172 comm_manager = self.shell.comm_manager
173 for msg_type in comm_msg_types:
174 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
175
176 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
127 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
177 self.control_handlers = {}
128 self.control_handlers = {}
178 for msg_type in control_msg_types:
129 for msg_type in control_msg_types:
179 self.control_handlers[msg_type] = getattr(self, msg_type)
130 self.control_handlers[msg_type] = getattr(self, msg_type)
180
131
181
132
182 def dispatch_control(self, msg):
133 def dispatch_control(self, msg):
183 """dispatch control requests"""
134 """dispatch control requests"""
184 idents,msg = self.session.feed_identities(msg, copy=False)
135 idents,msg = self.session.feed_identities(msg, copy=False)
185 try:
136 try:
186 msg = self.session.unserialize(msg, content=True, copy=False)
137 msg = self.session.unserialize(msg, content=True, copy=False)
187 except:
138 except:
188 self.log.error("Invalid Control Message", exc_info=True)
139 self.log.error("Invalid Control Message", exc_info=True)
189 return
140 return
190
141
191 self.log.debug("Control received: %s", msg)
142 self.log.debug("Control received: %s", msg)
192
143
193 header = msg['header']
144 header = msg['header']
194 msg_id = header['msg_id']
195 msg_type = header['msg_type']
145 msg_type = header['msg_type']
196
146
197 handler = self.control_handlers.get(msg_type, None)
147 handler = self.control_handlers.get(msg_type, None)
198 if handler is None:
148 if handler is None:
199 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
149 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
200 else:
150 else:
201 try:
151 try:
202 handler(self.control_stream, idents, msg)
152 handler(self.control_stream, idents, msg)
203 except Exception:
153 except Exception:
204 self.log.error("Exception in control handler:", exc_info=True)
154 self.log.error("Exception in control handler:", exc_info=True)
205
155
206 def dispatch_shell(self, stream, msg):
156 def dispatch_shell(self, stream, msg):
207 """dispatch shell requests"""
157 """dispatch shell requests"""
208 # flush control requests first
158 # flush control requests first
209 if self.control_stream:
159 if self.control_stream:
210 self.control_stream.flush()
160 self.control_stream.flush()
211
161
212 idents,msg = self.session.feed_identities(msg, copy=False)
162 idents,msg = self.session.feed_identities(msg, copy=False)
213 try:
163 try:
214 msg = self.session.unserialize(msg, content=True, copy=False)
164 msg = self.session.unserialize(msg, content=True, copy=False)
215 except:
165 except:
216 self.log.error("Invalid Message", exc_info=True)
166 self.log.error("Invalid Message", exc_info=True)
217 return
167 return
218
168
219 header = msg['header']
169 header = msg['header']
220 msg_id = header['msg_id']
170 msg_id = header['msg_id']
221 msg_type = msg['header']['msg_type']
171 msg_type = msg['header']['msg_type']
222
172
223 # Print some info about this message and leave a '--->' marker, so it's
173 # Print some info about this message and leave a '--->' marker, so it's
224 # easier to trace visually the message chain when debugging. Each
174 # easier to trace visually the message chain when debugging. Each
225 # handler prints its message at the end.
175 # handler prints its message at the end.
226 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
176 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
227 self.log.debug(' Content: %s\n --->\n ', msg['content'])
177 self.log.debug(' Content: %s\n --->\n ', msg['content'])
228
178
229 if msg_id in self.aborted:
179 if msg_id in self.aborted:
230 self.aborted.remove(msg_id)
180 self.aborted.remove(msg_id)
231 # is it safe to assume a msg_id will not be resubmitted?
181 # is it safe to assume a msg_id will not be resubmitted?
232 reply_type = msg_type.split('_')[0] + '_reply'
182 reply_type = msg_type.split('_')[0] + '_reply'
233 status = {'status' : 'aborted'}
183 status = {'status' : 'aborted'}
234 md = {'engine' : self.ident}
184 md = {'engine' : self.ident}
235 md.update(status)
185 md.update(status)
236 reply_msg = self.session.send(stream, reply_type, metadata=md,
186 self.session.send(stream, reply_type, metadata=md,
237 content=status, parent=msg, ident=idents)
187 content=status, parent=msg, ident=idents)
238 return
188 return
239
189
240 handler = self.shell_handlers.get(msg_type, None)
190 handler = self.shell_handlers.get(msg_type, None)
241 if handler is None:
191 if handler is None:
242 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
192 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
243 else:
193 else:
244 # ensure default_int_handler during handler call
194 # ensure default_int_handler during handler call
245 sig = signal(SIGINT, default_int_handler)
195 sig = signal(SIGINT, default_int_handler)
246 self.log.debug("%s: %s", msg_type, msg)
196 self.log.debug("%s: %s", msg_type, msg)
247 try:
197 try:
248 handler(stream, idents, msg)
198 handler(stream, idents, msg)
249 except Exception:
199 except Exception:
250 self.log.error("Exception in message handler:", exc_info=True)
200 self.log.error("Exception in message handler:", exc_info=True)
251 finally:
201 finally:
252 signal(SIGINT, sig)
202 signal(SIGINT, sig)
253
203
254 def enter_eventloop(self):
204 def enter_eventloop(self):
255 """enter eventloop"""
205 """enter eventloop"""
256 self.log.info("entering eventloop %s", self.eventloop)
206 self.log.info("entering eventloop %s", self.eventloop)
257 for stream in self.shell_streams:
207 for stream in self.shell_streams:
258 # flush any pending replies,
208 # flush any pending replies,
259 # which may be skipped by entering the eventloop
209 # which may be skipped by entering the eventloop
260 stream.flush(zmq.POLLOUT)
210 stream.flush(zmq.POLLOUT)
261 # restore default_int_handler
211 # restore default_int_handler
262 signal(SIGINT, default_int_handler)
212 signal(SIGINT, default_int_handler)
263 while self.eventloop is not None:
213 while self.eventloop is not None:
264 try:
214 try:
265 self.eventloop(self)
215 self.eventloop(self)
266 except KeyboardInterrupt:
216 except KeyboardInterrupt:
267 # Ctrl-C shouldn't crash the kernel
217 # Ctrl-C shouldn't crash the kernel
268 self.log.error("KeyboardInterrupt caught in kernel")
218 self.log.error("KeyboardInterrupt caught in kernel")
269 continue
219 continue
270 else:
220 else:
271 # eventloop exited cleanly, this means we should stop (right?)
221 # eventloop exited cleanly, this means we should stop (right?)
272 self.eventloop = None
222 self.eventloop = None
273 break
223 break
274 self.log.info("exiting eventloop")
224 self.log.info("exiting eventloop")
275
225
276 def start(self):
226 def start(self):
277 """register dispatchers for streams"""
227 """register dispatchers for streams"""
278 self.shell.exit_now = False
228 self.shell.exit_now = False
279 if self.control_stream:
229 if self.control_stream:
280 self.control_stream.on_recv(self.dispatch_control, copy=False)
230 self.control_stream.on_recv(self.dispatch_control, copy=False)
281
231
282 def make_dispatcher(stream):
232 def make_dispatcher(stream):
283 def dispatcher(msg):
233 def dispatcher(msg):
284 return self.dispatch_shell(stream, msg)
234 return self.dispatch_shell(stream, msg)
285 return dispatcher
235 return dispatcher
286
236
287 for s in self.shell_streams:
237 for s in self.shell_streams:
288 s.on_recv(make_dispatcher(s), copy=False)
238 s.on_recv(make_dispatcher(s), copy=False)
289
239
290 # publish idle status
240 # publish idle status
291 self._publish_status('starting')
241 self._publish_status('starting')
292
242
293 def do_one_iteration(self):
243 def do_one_iteration(self):
294 """step eventloop just once"""
244 """step eventloop just once"""
295 if self.control_stream:
245 if self.control_stream:
296 self.control_stream.flush()
246 self.control_stream.flush()
297 for stream in self.shell_streams:
247 for stream in self.shell_streams:
298 # handle at most one request per iteration
248 # handle at most one request per iteration
299 stream.flush(zmq.POLLIN, 1)
249 stream.flush(zmq.POLLIN, 1)
300 stream.flush(zmq.POLLOUT)
250 stream.flush(zmq.POLLOUT)
301
251
302
252
303 def record_ports(self, ports):
253 def record_ports(self, ports):
304 """Record the ports that this kernel is using.
254 """Record the ports that this kernel is using.
305
255
306 The creator of the Kernel instance must call this methods if they
256 The creator of the Kernel instance must call this methods if they
307 want the :meth:`connect_request` method to return the port numbers.
257 want the :meth:`connect_request` method to return the port numbers.
308 """
258 """
309 self._recorded_ports = ports
259 self._recorded_ports = ports
310
260
311 #---------------------------------------------------------------------------
261 #---------------------------------------------------------------------------
312 # Kernel request handlers
262 # Kernel request handlers
313 #---------------------------------------------------------------------------
263 #---------------------------------------------------------------------------
314
264
315 def _make_metadata(self, other=None):
265 def _make_metadata(self, other=None):
316 """init metadata dict, for execute/apply_reply"""
266 """init metadata dict, for execute/apply_reply"""
317 new_md = {
267 new_md = {
318 'dependencies_met' : True,
268 'dependencies_met' : True,
319 'engine' : self.ident,
269 'engine' : self.ident,
320 'started': datetime.now(),
270 'started': datetime.now(),
321 }
271 }
322 if other:
272 if other:
323 new_md.update(other)
273 new_md.update(other)
324 return new_md
274 return new_md
325
275
326 def _publish_execute_input(self, code, parent, execution_count):
276 def _publish_execute_input(self, code, parent, execution_count):
327 """Publish the code request on the iopub stream."""
277 """Publish the code request on the iopub stream."""
328
278
329 self.session.send(self.iopub_socket, u'execute_input',
279 self.session.send(self.iopub_socket, u'execute_input',
330 {u'code':code, u'execution_count': execution_count},
280 {u'code':code, u'execution_count': execution_count},
331 parent=parent, ident=self._topic('execute_input')
281 parent=parent, ident=self._topic('execute_input')
332 )
282 )
333
283
334 def _publish_status(self, status, parent=None):
284 def _publish_status(self, status, parent=None):
335 """send status (busy/idle) on IOPub"""
285 """send status (busy/idle) on IOPub"""
336 self.session.send(self.iopub_socket,
286 self.session.send(self.iopub_socket,
337 u'status',
287 u'status',
338 {u'execution_state': status},
288 {u'execution_state': status},
339 parent=parent,
289 parent=parent,
340 ident=self._topic('status'),
290 ident=self._topic('status'),
341 )
291 )
342
292
343 def _forward_input(self, allow_stdin=False):
344 """Forward raw_input and getpass to the current frontend.
345
346 via input_request
347 """
348 self._allow_stdin = allow_stdin
349
350 if py3compat.PY3:
351 self._sys_raw_input = builtin_mod.input
352 builtin_mod.input = self.raw_input
353 else:
354 self._sys_raw_input = builtin_mod.raw_input
355 self._sys_eval_input = builtin_mod.input
356 builtin_mod.raw_input = self.raw_input
357 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
358 self._save_getpass = getpass.getpass
359 getpass.getpass = self.getpass
360
361 def _restore_input(self):
362 """Restore raw_input, getpass"""
363 if py3compat.PY3:
364 builtin_mod.input = self._sys_raw_input
365 else:
366 builtin_mod.raw_input = self._sys_raw_input
367 builtin_mod.input = self._sys_eval_input
368
369 getpass.getpass = self._save_getpass
370
371 def set_parent(self, ident, parent):
293 def set_parent(self, ident, parent):
372 """Set the current parent_header
294 """Set the current parent_header
373
295
374 Side effects (IOPub messages) and replies are associated with
296 Side effects (IOPub messages) and replies are associated with
375 the request that caused them via the parent_header.
297 the request that caused them via the parent_header.
376
298
377 The parent identity is used to route input_request messages
299 The parent identity is used to route input_request messages
378 on the stdin channel.
300 on the stdin channel.
379 """
301 """
380 self._parent_ident = ident
302 self._parent_ident = ident
381 self._parent_header = parent
303 self._parent_header = parent
382 self.shell.set_parent(parent)
304
305 def send_response(self, stream, msg_or_type, content=None, ident=None,
306 buffers=None, track=False, header=None, metadata=None):
307 """Send a response to the message we're currently processing.
308
309 This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send`
310 except ``parent``.
311
312 This relies on :meth:`set_parent` having been called for the current
313 message.
314 """
315 return self.session.send(stream, msg_or_type, content, self._parent_header,
316 ident, buffers, track, header, metadata)
383
317
384 def execute_request(self, stream, ident, parent):
318 def execute_request(self, stream, ident, parent):
385 """handle an execute_request"""
319 """handle an execute_request"""
386
320
387 self._publish_status(u'busy', parent)
321 self._publish_status(u'busy', parent)
388
322
389 try:
323 try:
390 content = parent[u'content']
324 content = parent[u'content']
391 code = py3compat.cast_unicode_py2(content[u'code'])
325 code = py3compat.cast_unicode_py2(content[u'code'])
392 silent = content[u'silent']
326 silent = content[u'silent']
393 store_history = content.get(u'store_history', not silent)
327 store_history = content.get(u'store_history', not silent)
328 user_expressions = content.get('user_expressions', {})
329 allow_stdin = content.get('allow_stdin', False)
394 except:
330 except:
395 self.log.error("Got bad msg: ")
331 self.log.error("Got bad msg: ")
396 self.log.error("%s", parent)
332 self.log.error("%s", parent)
397 return
333 return
398
334
399 md = self._make_metadata(parent['metadata'])
335 md = self._make_metadata(parent['metadata'])
400
336
401 shell = self.shell # we'll need this a lot here
402
403 self._forward_input(content.get('allow_stdin', False))
404 # Set the parent message of the display hook and out streams.
337 # Set the parent message of the display hook and out streams.
405 self.set_parent(ident, parent)
338 self.set_parent(ident, parent)
406
339
407 # Re-broadcast our input for the benefit of listening clients, and
340 # Re-broadcast our input for the benefit of listening clients, and
408 # start computing output
341 # start computing output
409 if not silent:
342 if not silent:
410 self._publish_execute_input(code, parent, shell.execution_count)
343 self.execution_count += 1
411
344 self._publish_execute_input(code, parent, self.execution_count)
412 reply_content = {}
413 # FIXME: the shell calls the exception handler itself.
414 shell._reply_content = None
415 try:
416 shell.run_cell(code, store_history=store_history, silent=silent)
417 except:
418 status = u'error'
419 # FIXME: this code right now isn't being used yet by default,
420 # because the run_cell() call above directly fires off exception
421 # reporting. This code, therefore, is only active in the scenario
422 # where runlines itself has an unhandled exception. We need to
423 # uniformize this, for all exception construction to come from a
424 # single location in the codbase.
425 etype, evalue, tb = sys.exc_info()
426 tb_list = traceback.format_exception(etype, evalue, tb)
427 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
428 else:
429 status = u'ok'
430 finally:
431 self._restore_input()
432
433 reply_content[u'status'] = status
434
435 # Return the execution counter so clients can display prompts
436 reply_content['execution_count'] = shell.execution_count - 1
437
438 # FIXME - fish exception info out of shell, possibly left there by
439 # runlines. We'll need to clean up this logic later.
440 if shell._reply_content is not None:
441 reply_content.update(shell._reply_content)
442 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
443 reply_content['engine_info'] = e_info
444 # reset after use
445 shell._reply_content = None
446
345
447 if 'traceback' in reply_content:
346 reply_content = self.do_execute(code, silent, store_history,
448 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
347 user_expressions, allow_stdin)
449
450
451 # At this point, we can tell whether the main code execution succeeded
452 # or not. If it did, we proceed to evaluate user_expressions
453 if reply_content['status'] == 'ok':
454 reply_content[u'user_expressions'] = \
455 shell.user_expressions(content.get(u'user_expressions', {}))
456 else:
457 # If there was an error, don't even try to compute expressions
458 reply_content[u'user_expressions'] = {}
459
460 # Payloads should be retrieved regardless of outcome, so we can both
461 # recover partial output (that could have been generated early in a
462 # block, before an error) and clear the payload system always.
463 reply_content[u'payload'] = shell.payload_manager.read_payload()
464 # Be agressive about clearing the payload because we don't want
465 # it to sit in memory until the next execute_request comes in.
466 shell.payload_manager.clear_payload()
467
348
468 # Flush output before sending the reply.
349 # Flush output before sending the reply.
469 sys.stdout.flush()
350 sys.stdout.flush()
470 sys.stderr.flush()
351 sys.stderr.flush()
471 # FIXME: on rare occasions, the flush doesn't seem to make it to the
352 # FIXME: on rare occasions, the flush doesn't seem to make it to the
472 # clients... This seems to mitigate the problem, but we definitely need
353 # clients... This seems to mitigate the problem, but we definitely need
473 # to better understand what's going on.
354 # to better understand what's going on.
474 if self._execute_sleep:
355 if self._execute_sleep:
475 time.sleep(self._execute_sleep)
356 time.sleep(self._execute_sleep)
476
357
477 # Send the reply.
358 # Send the reply.
478 reply_content = json_clean(reply_content)
359 reply_content = json_clean(reply_content)
479
360
480 md['status'] = reply_content['status']
361 md['status'] = reply_content['status']
481 if reply_content['status'] == 'error' and \
362 if reply_content['status'] == 'error' and \
482 reply_content['ename'] == 'UnmetDependency':
363 reply_content['ename'] == 'UnmetDependency':
483 md['dependencies_met'] = False
364 md['dependencies_met'] = False
484
365
485 reply_msg = self.session.send(stream, u'execute_reply',
366 reply_msg = self.session.send(stream, u'execute_reply',
486 reply_content, parent, metadata=md,
367 reply_content, parent, metadata=md,
487 ident=ident)
368 ident=ident)
488
369
489 self.log.debug("%s", reply_msg)
370 self.log.debug("%s", reply_msg)
490
371
491 if not silent and reply_msg['content']['status'] == u'error':
372 if not silent and reply_msg['content']['status'] == u'error':
492 self._abort_queues()
373 self._abort_queues()
493
374
494 self._publish_status(u'idle', parent)
375 self._publish_status(u'idle', parent)
495
376
496 def complete_request(self, stream, ident, parent):
377 def complete_request(self, stream, ident, parent):
497 content = parent['content']
378 content = parent['content']
498 code = content['code']
379 code = content['code']
499 cursor_pos = content['cursor_pos']
380 cursor_pos = content['cursor_pos']
500
381
501 txt, matches = self.shell.complete('', code, cursor_pos)
382 matches = self.do_complete(code, cursor_pos)
502 matches = {'matches' : matches,
503 'cursor_end' : cursor_pos,
504 'cursor_start' : cursor_pos - len(txt),
505 'metadata' : {},
506 'status' : 'ok'}
507 matches = json_clean(matches)
383 matches = json_clean(matches)
508 completion_msg = self.session.send(stream, 'complete_reply',
384 completion_msg = self.session.send(stream, 'complete_reply',
509 matches, parent, ident)
385 matches, parent, ident)
510 self.log.debug("%s", completion_msg)
386 self.log.debug("%s", completion_msg)
511
387
512 def inspect_request(self, stream, ident, parent):
388 def inspect_request(self, stream, ident, parent):
513 content = parent['content']
389 content = parent['content']
514
390
515 name = token_at_cursor(content['code'], content['cursor_pos'])
391 reply_content = self.do_inspect(content['code'], content['cursor_pos'],
516 info = self.shell.object_inspect(name)
392 content.get('detail_level', 0))
517
518 reply_content = {'status' : 'ok'}
519 reply_content['data'] = data = {}
520 reply_content['metadata'] = {}
521 reply_content['found'] = info['found']
522 if info['found']:
523 info_text = self.shell.object_inspect_text(
524 name,
525 detail_level=content.get('detail_level', 0),
526 )
527 reply_content['data']['text/plain'] = info_text
528 # Before we send this object over, we scrub it for JSON usage
393 # Before we send this object over, we scrub it for JSON usage
529 reply_content = json_clean(reply_content)
394 reply_content = json_clean(reply_content)
530 msg = self.session.send(stream, 'inspect_reply',
395 msg = self.session.send(stream, 'inspect_reply',
531 reply_content, parent, ident)
396 reply_content, parent, ident)
532 self.log.debug("%s", msg)
397 self.log.debug("%s", msg)
533
398
534 def history_request(self, stream, ident, parent):
399 def history_request(self, stream, ident, parent):
535 # We need to pull these out, as passing **kwargs doesn't work with
400 # We need to pull these out, as passing **kwargs doesn't work with
536 # unicode keys before Python 2.6.5.
401 # unicode keys before Python 2.6.5.
537 hist_access_type = parent['content']['hist_access_type']
402 content = parent['content']
538 raw = parent['content']['raw']
539 output = parent['content']['output']
540 if hist_access_type == 'tail':
541 n = parent['content']['n']
542 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
543 include_latest=True)
544
545 elif hist_access_type == 'range':
546 session = parent['content']['session']
547 start = parent['content']['start']
548 stop = parent['content']['stop']
549 hist = self.shell.history_manager.get_range(session, start, stop,
550 raw=raw, output=output)
551
552 elif hist_access_type == 'search':
553 n = parent['content'].get('n')
554 unique = parent['content'].get('unique', False)
555 pattern = parent['content']['pattern']
556 hist = self.shell.history_manager.search(
557 pattern, raw=raw, output=output, n=n, unique=unique)
558
403
559 else:
404 reply_content = self.do_history(content['hist_access_type'],
560 hist = []
405 content['output'], content['raw'],
561 hist = list(hist)
406 content.get('session', None),
562 content = {'history' : hist}
407 content.get('start', None),
563 content = json_clean(content)
408 content.get('stop', None),
409 content.get('n', None),
410 content.get('pattern', None),
411 content.get('unique', False),
412 )
413
414 reply_content = json_clean(reply_content)
564 msg = self.session.send(stream, 'history_reply',
415 msg = self.session.send(stream, 'history_reply',
565 content, parent, ident)
416 reply_content, parent, ident)
566 self.log.debug("Sending history reply with %i entries", len(hist))
417 self.log.debug("%s", msg)
567
418
568 def connect_request(self, stream, ident, parent):
419 def connect_request(self, stream, ident, parent):
569 if self._recorded_ports is not None:
420 if self._recorded_ports is not None:
570 content = self._recorded_ports.copy()
421 content = self._recorded_ports.copy()
571 else:
422 else:
572 content = {}
423 content = {}
573 msg = self.session.send(stream, 'connect_reply',
424 msg = self.session.send(stream, 'connect_reply',
574 content, parent, ident)
425 content, parent, ident)
575 self.log.debug("%s", msg)
426 self.log.debug("%s", msg)
576
427
577 def kernel_info_request(self, stream, ident, parent):
428 @property
578 vinfo = {
429 def kernel_info(self):
579 'protocol_version': protocol_version,
430 return {
580 'implementation': 'ipython',
431 'protocol_version': release.kernel_protocol_version,
581 'implementation_version': ipython_version,
432 'implementation': self.implementation,
582 'language_version': language_version,
433 'implementation_version': self.implementation_version,
583 'language': 'python',
434 'language': self.language,
584 'banner': self.shell.banner,
435 'language_version': self.language_version,
436 'banner': self.banner,
585 }
437 }
438
439 def kernel_info_request(self, stream, ident, parent):
586 msg = self.session.send(stream, 'kernel_info_reply',
440 msg = self.session.send(stream, 'kernel_info_reply',
587 vinfo, parent, ident)
441 self.kernel_info, parent, ident)
588 self.log.debug("%s", msg)
442 self.log.debug("%s", msg)
589
443
590 def shutdown_request(self, stream, ident, parent):
444 def shutdown_request(self, stream, ident, parent):
591 self.shell.exit_now = True
445 content = self.do_shutdown(parent['content']['restart'])
592 content = dict(status='ok')
593 content.update(parent['content'])
594 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
446 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
595 # same content, but different msg_id for broadcasting on IOPub
447 # same content, but different msg_id for broadcasting on IOPub
596 self._shutdown_message = self.session.msg(u'shutdown_reply',
448 self._shutdown_message = self.session.msg(u'shutdown_reply',
597 content, parent
449 content, parent
598 )
450 )
599
451
600 self._at_shutdown()
452 self._at_shutdown()
601 # call sys.exit after a short delay
453 # call sys.exit after a short delay
602 loop = ioloop.IOLoop.instance()
454 loop = ioloop.IOLoop.instance()
603 loop.add_timeout(time.time()+0.1, loop.stop)
455 loop.add_timeout(time.time()+0.1, loop.stop)
604
456
605 #---------------------------------------------------------------------------
457 #---------------------------------------------------------------------------
606 # Engine methods
458 # Engine methods
607 #---------------------------------------------------------------------------
459 #---------------------------------------------------------------------------
608
460
609 def apply_request(self, stream, ident, parent):
461 def apply_request(self, stream, ident, parent):
610 try:
462 try:
611 content = parent[u'content']
463 content = parent[u'content']
612 bufs = parent[u'buffers']
464 bufs = parent[u'buffers']
613 msg_id = parent['header']['msg_id']
465 msg_id = parent['header']['msg_id']
614 except:
466 except:
615 self.log.error("Got bad msg: %s", parent, exc_info=True)
467 self.log.error("Got bad msg: %s", parent, exc_info=True)
616 return
468 return
617
469
618 self._publish_status(u'busy', parent)
470 self._publish_status(u'busy', parent)
619
471
620 # Set the parent message of the display hook and out streams.
472 # Set the parent message of the display hook and out streams.
621 shell = self.shell
473 self.set_parent(ident, parent)
622 shell.set_parent(parent)
623
474
624 md = self._make_metadata(parent['metadata'])
475 md = self._make_metadata(parent['metadata'])
625 try:
626 working = shell.user_ns
627
628 prefix = "_"+str(msg_id).replace("-","")+"_"
629
630 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
631
632 fname = getattr(f, '__name__', 'f')
633
634 fname = prefix+"f"
635 argname = prefix+"args"
636 kwargname = prefix+"kwargs"
637 resultname = prefix+"result"
638
639 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
640 # print ns
641 working.update(ns)
642 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
643 try:
644 exec(code, shell.user_global_ns, shell.user_ns)
645 result = working.get(resultname)
646 finally:
647 for key in ns:
648 working.pop(key)
649
650 result_buf = serialize_object(result,
651 buffer_threshold=self.session.buffer_threshold,
652 item_threshold=self.session.item_threshold,
653 )
654
476
655 except:
477 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
656 # invoke IPython traceback formatting
657 shell.showtraceback()
658 # FIXME - fish exception info out of shell, possibly left there by
659 # run_code. We'll need to clean up this logic later.
660 reply_content = {}
661 if shell._reply_content is not None:
662 reply_content.update(shell._reply_content)
663 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
664 reply_content['engine_info'] = e_info
665 # reset after use
666 shell._reply_content = None
667
668 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
669 ident=self._topic('error'))
670 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
671 result_buf = []
672
673 if reply_content['ename'] == 'UnmetDependency':
674 md['dependencies_met'] = False
675 else:
676 reply_content = {'status' : 'ok'}
677
478
678 # put 'ok'/'error' status in header, for scheduler introspection:
479 # put 'ok'/'error' status in header, for scheduler introspection:
679 md['status'] = reply_content['status']
480 md['status'] = reply_content['status']
680
481
681 # flush i/o
482 # flush i/o
682 sys.stdout.flush()
483 sys.stdout.flush()
683 sys.stderr.flush()
484 sys.stderr.flush()
684
485
685 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
486 self.session.send(stream, u'apply_reply', reply_content,
686 parent=parent, ident=ident,buffers=result_buf, metadata=md)
487 parent=parent, ident=ident,buffers=result_buf, metadata=md)
687
488
688 self._publish_status(u'idle', parent)
489 self._publish_status(u'idle', parent)
689
490
690 #---------------------------------------------------------------------------
491 #---------------------------------------------------------------------------
691 # Control messages
492 # Control messages
692 #---------------------------------------------------------------------------
493 #---------------------------------------------------------------------------
693
494
694 def abort_request(self, stream, ident, parent):
495 def abort_request(self, stream, ident, parent):
695 """abort a specifig msg by id"""
496 """abort a specifig msg by id"""
696 msg_ids = parent['content'].get('msg_ids', None)
497 msg_ids = parent['content'].get('msg_ids', None)
697 if isinstance(msg_ids, string_types):
498 if isinstance(msg_ids, string_types):
698 msg_ids = [msg_ids]
499 msg_ids = [msg_ids]
699 if not msg_ids:
500 if not msg_ids:
700 self.abort_queues()
501 self.abort_queues()
701 for mid in msg_ids:
502 for mid in msg_ids:
702 self.aborted.add(str(mid))
503 self.aborted.add(str(mid))
703
504
704 content = dict(status='ok')
505 content = dict(status='ok')
705 reply_msg = self.session.send(stream, 'abort_reply', content=content,
506 reply_msg = self.session.send(stream, 'abort_reply', content=content,
706 parent=parent, ident=ident)
507 parent=parent, ident=ident)
707 self.log.debug("%s", reply_msg)
508 self.log.debug("%s", reply_msg)
708
509
709 def clear_request(self, stream, idents, parent):
510 def clear_request(self, stream, idents, parent):
710 """Clear our namespace."""
511 """Clear our namespace."""
711 self.shell.reset(False)
512 content = self.do_clear()
712 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
513 self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
713 content = dict(status='ok'))
514 content = content)
714
715
515
716 #---------------------------------------------------------------------------
516 #---------------------------------------------------------------------------
717 # Protected interface
517 # Protected interface
718 #---------------------------------------------------------------------------
518 #---------------------------------------------------------------------------
719
519
720 def _wrap_exception(self, method=None):
721 # import here, because _wrap_exception is only used in parallel,
722 # and parallel has higher min pyzmq version
723 from IPython.parallel.error import wrap_exception
724 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
725 content = wrap_exception(e_info)
726 return content
727
728 def _topic(self, topic):
520 def _topic(self, topic):
729 """prefixed topic for IOPub messages"""
521 """prefixed topic for IOPub messages"""
730 if self.int_id >= 0:
522 if self.int_id >= 0:
731 base = "engine.%i" % self.int_id
523 base = "engine.%i" % self.int_id
732 else:
524 else:
733 base = "kernel.%s" % self.ident
525 base = "kernel.%s" % self.ident
734
526
735 return py3compat.cast_bytes("%s.%s" % (base, topic))
527 return py3compat.cast_bytes("%s.%s" % (base, topic))
736
528
737 def _abort_queues(self):
529 def _abort_queues(self):
738 for stream in self.shell_streams:
530 for stream in self.shell_streams:
739 if stream:
531 if stream:
740 self._abort_queue(stream)
532 self._abort_queue(stream)
741
533
742 def _abort_queue(self, stream):
534 def _abort_queue(self, stream):
743 poller = zmq.Poller()
535 poller = zmq.Poller()
744 poller.register(stream.socket, zmq.POLLIN)
536 poller.register(stream.socket, zmq.POLLIN)
745 while True:
537 while True:
746 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
538 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
747 if msg is None:
539 if msg is None:
748 return
540 return
749
541
750 self.log.info("Aborting:")
542 self.log.info("Aborting:")
751 self.log.info("%s", msg)
543 self.log.info("%s", msg)
752 msg_type = msg['header']['msg_type']
544 msg_type = msg['header']['msg_type']
753 reply_type = msg_type.split('_')[0] + '_reply'
545 reply_type = msg_type.split('_')[0] + '_reply'
754
546
755 status = {'status' : 'aborted'}
547 status = {'status' : 'aborted'}
756 md = {'engine' : self.ident}
548 md = {'engine' : self.ident}
757 md.update(status)
549 md.update(status)
758 reply_msg = self.session.send(stream, reply_type, metadata=md,
550 reply_msg = self.session.send(stream, reply_type, metadata=md,
759 content=status, parent=msg, ident=idents)
551 content=status, parent=msg, ident=idents)
760 self.log.debug("%s", reply_msg)
552 self.log.debug("%s", reply_msg)
761 # We need to wait a bit for requests to come in. This can probably
553 # We need to wait a bit for requests to come in. This can probably
762 # be set shorter for true asynchronous clients.
554 # be set shorter for true asynchronous clients.
763 poller.poll(50)
555 poller.poll(50)
764
556
765
557
766 def _no_raw_input(self):
558 def _no_raw_input(self):
767 """Raise StdinNotImplentedError if active frontend doesn't support
559 """Raise StdinNotImplentedError if active frontend doesn't support
768 stdin."""
560 stdin."""
769 raise StdinNotImplementedError("raw_input was called, but this "
561 raise StdinNotImplementedError("raw_input was called, but this "
770 "frontend does not support stdin.")
562 "frontend does not support stdin.")
771
563
772 def getpass(self, prompt=''):
564 def getpass(self, prompt=''):
773 """Forward getpass to frontends
565 """Forward getpass to frontends
774
566
775 Raises
567 Raises
776 ------
568 ------
777 StdinNotImplentedError if active frontend doesn't support stdin.
569 StdinNotImplentedError if active frontend doesn't support stdin.
778 """
570 """
779 if not self._allow_stdin:
571 if not self._allow_stdin:
780 raise StdinNotImplementedError(
572 raise StdinNotImplementedError(
781 "getpass was called, but this frontend does not support input requests."
573 "getpass was called, but this frontend does not support input requests."
782 )
574 )
783 return self._input_request(prompt,
575 return self._input_request(prompt,
784 self._parent_ident,
576 self._parent_ident,
785 self._parent_header,
577 self._parent_header,
786 password=True,
578 password=True,
787 )
579 )
788
580
789 def raw_input(self, prompt=''):
581 def raw_input(self, prompt=''):
790 """Forward raw_input to frontends
582 """Forward raw_input to frontends
791
583
792 Raises
584 Raises
793 ------
585 ------
794 StdinNotImplentedError if active frontend doesn't support stdin.
586 StdinNotImplentedError if active frontend doesn't support stdin.
795 """
587 """
796 if not self._allow_stdin:
588 if not self._allow_stdin:
797 raise StdinNotImplementedError(
589 raise StdinNotImplementedError(
798 "raw_input was called, but this frontend does not support input requests."
590 "raw_input was called, but this frontend does not support input requests."
799 )
591 )
800 return self._input_request(prompt,
592 return self._input_request(prompt,
801 self._parent_ident,
593 self._parent_ident,
802 self._parent_header,
594 self._parent_header,
803 password=False,
595 password=False,
804 )
596 )
805
597
806 def _input_request(self, prompt, ident, parent, password=False):
598 def _input_request(self, prompt, ident, parent, password=False):
807 # Flush output before making the request.
599 # Flush output before making the request.
808 sys.stderr.flush()
600 sys.stderr.flush()
809 sys.stdout.flush()
601 sys.stdout.flush()
810 # flush the stdin socket, to purge stale replies
602 # flush the stdin socket, to purge stale replies
811 while True:
603 while True:
812 try:
604 try:
813 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
605 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
814 except zmq.ZMQError as e:
606 except zmq.ZMQError as e:
815 if e.errno == zmq.EAGAIN:
607 if e.errno == zmq.EAGAIN:
816 break
608 break
817 else:
609 else:
818 raise
610 raise
819
611
820 # Send the input request.
612 # Send the input request.
821 content = json_clean(dict(prompt=prompt, password=password))
613 content = json_clean(dict(prompt=prompt, password=password))
822 self.session.send(self.stdin_socket, u'input_request', content, parent,
614 self.session.send(self.stdin_socket, u'input_request', content, parent,
823 ident=ident)
615 ident=ident)
824
616
825 # Await a response.
617 # Await a response.
826 while True:
618 while True:
827 try:
619 try:
828 ident, reply = self.session.recv(self.stdin_socket, 0)
620 ident, reply = self.session.recv(self.stdin_socket, 0)
829 except Exception:
621 except Exception:
830 self.log.warn("Invalid Message:", exc_info=True)
622 self.log.warn("Invalid Message:", exc_info=True)
831 except KeyboardInterrupt:
623 except KeyboardInterrupt:
832 # re-raise KeyboardInterrupt, to truncate traceback
624 # re-raise KeyboardInterrupt, to truncate traceback
833 raise KeyboardInterrupt
625 raise KeyboardInterrupt
834 else:
626 else:
835 break
627 break
836 try:
628 try:
837 value = py3compat.unicode_to_str(reply['content']['value'])
629 value = py3compat.unicode_to_str(reply['content']['value'])
838 except:
630 except:
839 self.log.error("Bad input_reply: %s", parent)
631 self.log.error("Bad input_reply: %s", parent)
840 value = ''
632 value = ''
841 if value == '\x04':
633 if value == '\x04':
842 # EOF
634 # EOF
843 raise EOFError
635 raise EOFError
844 return value
636 return value
845
637
846 def _at_shutdown(self):
638 def _at_shutdown(self):
847 """Actions taken at shutdown by the kernel, called by python's atexit.
639 """Actions taken at shutdown by the kernel, called by python's atexit.
848 """
640 """
849 # io.rprint("Kernel at_shutdown") # dbg
641 # io.rprint("Kernel at_shutdown") # dbg
850 if self._shutdown_message is not None:
642 if self._shutdown_message is not None:
851 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
643 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
852 self.log.debug("%s", self._shutdown_message)
644 self.log.debug("%s", self._shutdown_message)
853 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
645 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
854
646
@@ -1,342 +1,342
1 """Tests for asyncresult.py"""
1 """Tests for asyncresult.py"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import time
6 import time
7
7
8 import nose.tools as nt
8 import nose.tools as nt
9
9
10 from IPython.utils.io import capture_output
10 from IPython.utils.io import capture_output
11
11
12 from IPython.parallel.error import TimeoutError
12 from IPython.parallel.error import TimeoutError
13 from IPython.parallel import error, Client
13 from IPython.parallel import error, Client
14 from IPython.parallel.tests import add_engines
14 from IPython.parallel.tests import add_engines
15 from .clienttest import ClusterTestCase
15 from .clienttest import ClusterTestCase
16 from IPython.utils.py3compat import iteritems
16 from IPython.utils.py3compat import iteritems
17
17
18 def setup():
18 def setup():
19 add_engines(2, total=True)
19 add_engines(2, total=True)
20
20
21 def wait(n):
21 def wait(n):
22 import time
22 import time
23 time.sleep(n)
23 time.sleep(n)
24 return n
24 return n
25
25
26 def echo(x):
26 def echo(x):
27 return x
27 return x
28
28
29 class AsyncResultTest(ClusterTestCase):
29 class AsyncResultTest(ClusterTestCase):
30
30
31 def test_single_result_view(self):
31 def test_single_result_view(self):
32 """various one-target views get the right value for single_result"""
32 """various one-target views get the right value for single_result"""
33 eid = self.client.ids[-1]
33 eid = self.client.ids[-1]
34 ar = self.client[eid].apply_async(lambda : 42)
34 ar = self.client[eid].apply_async(lambda : 42)
35 self.assertEqual(ar.get(), 42)
35 self.assertEqual(ar.get(), 42)
36 ar = self.client[[eid]].apply_async(lambda : 42)
36 ar = self.client[[eid]].apply_async(lambda : 42)
37 self.assertEqual(ar.get(), [42])
37 self.assertEqual(ar.get(), [42])
38 ar = self.client[-1:].apply_async(lambda : 42)
38 ar = self.client[-1:].apply_async(lambda : 42)
39 self.assertEqual(ar.get(), [42])
39 self.assertEqual(ar.get(), [42])
40
40
41 def test_get_after_done(self):
41 def test_get_after_done(self):
42 ar = self.client[-1].apply_async(lambda : 42)
42 ar = self.client[-1].apply_async(lambda : 42)
43 ar.wait()
43 ar.wait()
44 self.assertTrue(ar.ready())
44 self.assertTrue(ar.ready())
45 self.assertEqual(ar.get(), 42)
45 self.assertEqual(ar.get(), 42)
46 self.assertEqual(ar.get(), 42)
46 self.assertEqual(ar.get(), 42)
47
47
48 def test_get_before_done(self):
48 def test_get_before_done(self):
49 ar = self.client[-1].apply_async(wait, 0.1)
49 ar = self.client[-1].apply_async(wait, 0.1)
50 self.assertRaises(TimeoutError, ar.get, 0)
50 self.assertRaises(TimeoutError, ar.get, 0)
51 ar.wait(0)
51 ar.wait(0)
52 self.assertFalse(ar.ready())
52 self.assertFalse(ar.ready())
53 self.assertEqual(ar.get(), 0.1)
53 self.assertEqual(ar.get(), 0.1)
54
54
55 def test_get_after_error(self):
55 def test_get_after_error(self):
56 ar = self.client[-1].apply_async(lambda : 1/0)
56 ar = self.client[-1].apply_async(lambda : 1/0)
57 ar.wait(10)
57 ar.wait(10)
58 self.assertRaisesRemote(ZeroDivisionError, ar.get)
58 self.assertRaisesRemote(ZeroDivisionError, ar.get)
59 self.assertRaisesRemote(ZeroDivisionError, ar.get)
59 self.assertRaisesRemote(ZeroDivisionError, ar.get)
60 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
60 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
61
61
62 def test_get_dict(self):
62 def test_get_dict(self):
63 n = len(self.client)
63 n = len(self.client)
64 ar = self.client[:].apply_async(lambda : 5)
64 ar = self.client[:].apply_async(lambda : 5)
65 self.assertEqual(ar.get(), [5]*n)
65 self.assertEqual(ar.get(), [5]*n)
66 d = ar.get_dict()
66 d = ar.get_dict()
67 self.assertEqual(sorted(d.keys()), sorted(self.client.ids))
67 self.assertEqual(sorted(d.keys()), sorted(self.client.ids))
68 for eid,r in iteritems(d):
68 for eid,r in iteritems(d):
69 self.assertEqual(r, 5)
69 self.assertEqual(r, 5)
70
70
71 def test_get_dict_single(self):
71 def test_get_dict_single(self):
72 view = self.client[-1]
72 view = self.client[-1]
73 for v in (list(range(5)), 5, ('abc', 'def'), 'string'):
73 for v in (list(range(5)), 5, ('abc', 'def'), 'string'):
74 ar = view.apply_async(echo, v)
74 ar = view.apply_async(echo, v)
75 self.assertEqual(ar.get(), v)
75 self.assertEqual(ar.get(), v)
76 d = ar.get_dict()
76 d = ar.get_dict()
77 self.assertEqual(d, {view.targets : v})
77 self.assertEqual(d, {view.targets : v})
78
78
79 def test_get_dict_bad(self):
79 def test_get_dict_bad(self):
80 ar = self.client[:].apply_async(lambda : 5)
80 ar = self.client[:].apply_async(lambda : 5)
81 ar2 = self.client[:].apply_async(lambda : 5)
81 ar2 = self.client[:].apply_async(lambda : 5)
82 ar = self.client.get_result(ar.msg_ids + ar2.msg_ids)
82 ar = self.client.get_result(ar.msg_ids + ar2.msg_ids)
83 self.assertRaises(ValueError, ar.get_dict)
83 self.assertRaises(ValueError, ar.get_dict)
84
84
85 def test_list_amr(self):
85 def test_list_amr(self):
86 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
86 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
87 rlist = list(ar)
87 rlist = list(ar)
88
88
89 def test_getattr(self):
89 def test_getattr(self):
90 ar = self.client[:].apply_async(wait, 0.5)
90 ar = self.client[:].apply_async(wait, 0.5)
91 self.assertEqual(ar.engine_id, [None] * len(ar))
91 self.assertEqual(ar.engine_id, [None] * len(ar))
92 self.assertRaises(AttributeError, lambda : ar._foo)
92 self.assertRaises(AttributeError, lambda : ar._foo)
93 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
93 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
94 self.assertRaises(AttributeError, lambda : ar.foo)
94 self.assertRaises(AttributeError, lambda : ar.foo)
95 self.assertFalse(hasattr(ar, '__length_hint__'))
95 self.assertFalse(hasattr(ar, '__length_hint__'))
96 self.assertFalse(hasattr(ar, 'foo'))
96 self.assertFalse(hasattr(ar, 'foo'))
97 self.assertTrue(hasattr(ar, 'engine_id'))
97 self.assertTrue(hasattr(ar, 'engine_id'))
98 ar.get(5)
98 ar.get(5)
99 self.assertRaises(AttributeError, lambda : ar._foo)
99 self.assertRaises(AttributeError, lambda : ar._foo)
100 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
100 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
101 self.assertRaises(AttributeError, lambda : ar.foo)
101 self.assertRaises(AttributeError, lambda : ar.foo)
102 self.assertTrue(isinstance(ar.engine_id, list))
102 self.assertTrue(isinstance(ar.engine_id, list))
103 self.assertEqual(ar.engine_id, ar['engine_id'])
103 self.assertEqual(ar.engine_id, ar['engine_id'])
104 self.assertFalse(hasattr(ar, '__length_hint__'))
104 self.assertFalse(hasattr(ar, '__length_hint__'))
105 self.assertFalse(hasattr(ar, 'foo'))
105 self.assertFalse(hasattr(ar, 'foo'))
106 self.assertTrue(hasattr(ar, 'engine_id'))
106 self.assertTrue(hasattr(ar, 'engine_id'))
107
107
108 def test_getitem(self):
108 def test_getitem(self):
109 ar = self.client[:].apply_async(wait, 0.5)
109 ar = self.client[:].apply_async(wait, 0.5)
110 self.assertEqual(ar['engine_id'], [None] * len(ar))
110 self.assertEqual(ar['engine_id'], [None] * len(ar))
111 self.assertRaises(KeyError, lambda : ar['foo'])
111 self.assertRaises(KeyError, lambda : ar['foo'])
112 ar.get(5)
112 ar.get(5)
113 self.assertRaises(KeyError, lambda : ar['foo'])
113 self.assertRaises(KeyError, lambda : ar['foo'])
114 self.assertTrue(isinstance(ar['engine_id'], list))
114 self.assertTrue(isinstance(ar['engine_id'], list))
115 self.assertEqual(ar.engine_id, ar['engine_id'])
115 self.assertEqual(ar.engine_id, ar['engine_id'])
116
116
117 def test_single_result(self):
117 def test_single_result(self):
118 ar = self.client[-1].apply_async(wait, 0.5)
118 ar = self.client[-1].apply_async(wait, 0.5)
119 self.assertRaises(KeyError, lambda : ar['foo'])
119 self.assertRaises(KeyError, lambda : ar['foo'])
120 self.assertEqual(ar['engine_id'], None)
120 self.assertEqual(ar['engine_id'], None)
121 self.assertTrue(ar.get(5) == 0.5)
121 self.assertTrue(ar.get(5) == 0.5)
122 self.assertTrue(isinstance(ar['engine_id'], int))
122 self.assertTrue(isinstance(ar['engine_id'], int))
123 self.assertTrue(isinstance(ar.engine_id, int))
123 self.assertTrue(isinstance(ar.engine_id, int))
124 self.assertEqual(ar.engine_id, ar['engine_id'])
124 self.assertEqual(ar.engine_id, ar['engine_id'])
125
125
126 def test_abort(self):
126 def test_abort(self):
127 e = self.client[-1]
127 e = self.client[-1]
128 ar = e.execute('import time; time.sleep(1)', block=False)
128 ar = e.execute('import time; time.sleep(1)', block=False)
129 ar2 = e.apply_async(lambda : 2)
129 ar2 = e.apply_async(lambda : 2)
130 ar2.abort()
130 ar2.abort()
131 self.assertRaises(error.TaskAborted, ar2.get)
131 self.assertRaises(error.TaskAborted, ar2.get)
132 ar.get()
132 ar.get()
133
133
134 def test_len(self):
134 def test_len(self):
135 v = self.client.load_balanced_view()
135 v = self.client.load_balanced_view()
136 ar = v.map_async(lambda x: x, list(range(10)))
136 ar = v.map_async(lambda x: x, list(range(10)))
137 self.assertEqual(len(ar), 10)
137 self.assertEqual(len(ar), 10)
138 ar = v.apply_async(lambda x: x, list(range(10)))
138 ar = v.apply_async(lambda x: x, list(range(10)))
139 self.assertEqual(len(ar), 1)
139 self.assertEqual(len(ar), 1)
140 ar = self.client[:].apply_async(lambda x: x, list(range(10)))
140 ar = self.client[:].apply_async(lambda x: x, list(range(10)))
141 self.assertEqual(len(ar), len(self.client.ids))
141 self.assertEqual(len(ar), len(self.client.ids))
142
142
143 def test_wall_time_single(self):
143 def test_wall_time_single(self):
144 v = self.client.load_balanced_view()
144 v = self.client.load_balanced_view()
145 ar = v.apply_async(time.sleep, 0.25)
145 ar = v.apply_async(time.sleep, 0.25)
146 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
146 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
147 ar.get(2)
147 ar.get(2)
148 self.assertTrue(ar.wall_time < 1.)
148 self.assertTrue(ar.wall_time < 1.)
149 self.assertTrue(ar.wall_time > 0.2)
149 self.assertTrue(ar.wall_time > 0.2)
150
150
151 def test_wall_time_multi(self):
151 def test_wall_time_multi(self):
152 self.minimum_engines(4)
152 self.minimum_engines(4)
153 v = self.client[:]
153 v = self.client[:]
154 ar = v.apply_async(time.sleep, 0.25)
154 ar = v.apply_async(time.sleep, 0.25)
155 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
155 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
156 ar.get(2)
156 ar.get(2)
157 self.assertTrue(ar.wall_time < 1.)
157 self.assertTrue(ar.wall_time < 1.)
158 self.assertTrue(ar.wall_time > 0.2)
158 self.assertTrue(ar.wall_time > 0.2)
159
159
160 def test_serial_time_single(self):
160 def test_serial_time_single(self):
161 v = self.client.load_balanced_view()
161 v = self.client.load_balanced_view()
162 ar = v.apply_async(time.sleep, 0.25)
162 ar = v.apply_async(time.sleep, 0.25)
163 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
163 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
164 ar.get(2)
164 ar.get(2)
165 self.assertTrue(ar.serial_time < 1.)
165 self.assertTrue(ar.serial_time < 1.)
166 self.assertTrue(ar.serial_time > 0.2)
166 self.assertTrue(ar.serial_time > 0.2)
167
167
168 def test_serial_time_multi(self):
168 def test_serial_time_multi(self):
169 self.minimum_engines(4)
169 self.minimum_engines(4)
170 v = self.client[:]
170 v = self.client[:]
171 ar = v.apply_async(time.sleep, 0.25)
171 ar = v.apply_async(time.sleep, 0.25)
172 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
172 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
173 ar.get(2)
173 ar.get(2)
174 self.assertTrue(ar.serial_time < 2.)
174 self.assertTrue(ar.serial_time < 2.)
175 self.assertTrue(ar.serial_time > 0.8)
175 self.assertTrue(ar.serial_time > 0.8)
176
176
177 def test_elapsed_single(self):
177 def test_elapsed_single(self):
178 v = self.client.load_balanced_view()
178 v = self.client.load_balanced_view()
179 ar = v.apply_async(time.sleep, 0.25)
179 ar = v.apply_async(time.sleep, 0.25)
180 while not ar.ready():
180 while not ar.ready():
181 time.sleep(0.01)
181 time.sleep(0.01)
182 self.assertTrue(ar.elapsed < 1)
182 self.assertTrue(ar.elapsed < 1)
183 self.assertTrue(ar.elapsed < 1)
183 self.assertTrue(ar.elapsed < 1)
184 ar.get(2)
184 ar.get(2)
185
185
186 def test_elapsed_multi(self):
186 def test_elapsed_multi(self):
187 v = self.client[:]
187 v = self.client[:]
188 ar = v.apply_async(time.sleep, 0.25)
188 ar = v.apply_async(time.sleep, 0.25)
189 while not ar.ready():
189 while not ar.ready():
190 time.sleep(0.01)
190 time.sleep(0.01)
191 self.assertTrue(ar.elapsed < 1)
191 self.assertLess(ar.elapsed, 1)
192 self.assertTrue(ar.elapsed < 1)
192 self.assertLess(ar.elapsed, 1)
193 ar.get(2)
193 ar.get(2)
194
194
195 def test_hubresult_timestamps(self):
195 def test_hubresult_timestamps(self):
196 self.minimum_engines(4)
196 self.minimum_engines(4)
197 v = self.client[:]
197 v = self.client[:]
198 ar = v.apply_async(time.sleep, 0.25)
198 ar = v.apply_async(time.sleep, 0.25)
199 ar.get(2)
199 ar.get(2)
200 rc2 = Client(profile='iptest')
200 rc2 = Client(profile='iptest')
201 # must have try/finally to close second Client, otherwise
201 # must have try/finally to close second Client, otherwise
202 # will have dangling sockets causing problems
202 # will have dangling sockets causing problems
203 try:
203 try:
204 time.sleep(0.25)
204 time.sleep(0.25)
205 hr = rc2.get_result(ar.msg_ids)
205 hr = rc2.get_result(ar.msg_ids)
206 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
206 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
207 hr.get(1)
207 hr.get(1)
208 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
208 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
209 self.assertEqual(hr.serial_time, ar.serial_time)
209 self.assertEqual(hr.serial_time, ar.serial_time)
210 finally:
210 finally:
211 rc2.close()
211 rc2.close()
212
212
213 def test_display_empty_streams_single(self):
213 def test_display_empty_streams_single(self):
214 """empty stdout/err are not displayed (single result)"""
214 """empty stdout/err are not displayed (single result)"""
215 self.minimum_engines(1)
215 self.minimum_engines(1)
216
216
217 v = self.client[-1]
217 v = self.client[-1]
218 ar = v.execute("print (5555)")
218 ar = v.execute("print (5555)")
219 ar.get(5)
219 ar.get(5)
220 with capture_output() as io:
220 with capture_output() as io:
221 ar.display_outputs()
221 ar.display_outputs()
222 self.assertEqual(io.stderr, '')
222 self.assertEqual(io.stderr, '')
223 self.assertEqual('5555\n', io.stdout)
223 self.assertEqual('5555\n', io.stdout)
224
224
225 ar = v.execute("a=5")
225 ar = v.execute("a=5")
226 ar.get(5)
226 ar.get(5)
227 with capture_output() as io:
227 with capture_output() as io:
228 ar.display_outputs()
228 ar.display_outputs()
229 self.assertEqual(io.stderr, '')
229 self.assertEqual(io.stderr, '')
230 self.assertEqual(io.stdout, '')
230 self.assertEqual(io.stdout, '')
231
231
232 def test_display_empty_streams_type(self):
232 def test_display_empty_streams_type(self):
233 """empty stdout/err are not displayed (groupby type)"""
233 """empty stdout/err are not displayed (groupby type)"""
234 self.minimum_engines(1)
234 self.minimum_engines(1)
235
235
236 v = self.client[:]
236 v = self.client[:]
237 ar = v.execute("print (5555)")
237 ar = v.execute("print (5555)")
238 ar.get(5)
238 ar.get(5)
239 with capture_output() as io:
239 with capture_output() as io:
240 ar.display_outputs()
240 ar.display_outputs()
241 self.assertEqual(io.stderr, '')
241 self.assertEqual(io.stderr, '')
242 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
242 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
243 self.assertFalse('\n\n' in io.stdout, io.stdout)
243 self.assertFalse('\n\n' in io.stdout, io.stdout)
244 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
244 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
245
245
246 ar = v.execute("a=5")
246 ar = v.execute("a=5")
247 ar.get(5)
247 ar.get(5)
248 with capture_output() as io:
248 with capture_output() as io:
249 ar.display_outputs()
249 ar.display_outputs()
250 self.assertEqual(io.stderr, '')
250 self.assertEqual(io.stderr, '')
251 self.assertEqual(io.stdout, '')
251 self.assertEqual(io.stdout, '')
252
252
253 def test_display_empty_streams_engine(self):
253 def test_display_empty_streams_engine(self):
254 """empty stdout/err are not displayed (groupby engine)"""
254 """empty stdout/err are not displayed (groupby engine)"""
255 self.minimum_engines(1)
255 self.minimum_engines(1)
256
256
257 v = self.client[:]
257 v = self.client[:]
258 ar = v.execute("print (5555)")
258 ar = v.execute("print (5555)")
259 ar.get(5)
259 ar.get(5)
260 with capture_output() as io:
260 with capture_output() as io:
261 ar.display_outputs('engine')
261 ar.display_outputs('engine')
262 self.assertEqual(io.stderr, '')
262 self.assertEqual(io.stderr, '')
263 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
263 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
264 self.assertFalse('\n\n' in io.stdout, io.stdout)
264 self.assertFalse('\n\n' in io.stdout, io.stdout)
265 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
265 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
266
266
267 ar = v.execute("a=5")
267 ar = v.execute("a=5")
268 ar.get(5)
268 ar.get(5)
269 with capture_output() as io:
269 with capture_output() as io:
270 ar.display_outputs('engine')
270 ar.display_outputs('engine')
271 self.assertEqual(io.stderr, '')
271 self.assertEqual(io.stderr, '')
272 self.assertEqual(io.stdout, '')
272 self.assertEqual(io.stdout, '')
273
273
274 def test_await_data(self):
274 def test_await_data(self):
275 """asking for ar.data flushes outputs"""
275 """asking for ar.data flushes outputs"""
276 self.minimum_engines(1)
276 self.minimum_engines(1)
277
277
278 v = self.client[-1]
278 v = self.client[-1]
279 ar = v.execute('\n'.join([
279 ar = v.execute('\n'.join([
280 "import time",
280 "import time",
281 "from IPython.kernel.zmq.datapub import publish_data",
281 "from IPython.kernel.zmq.datapub import publish_data",
282 "for i in range(5):",
282 "for i in range(5):",
283 " publish_data(dict(i=i))",
283 " publish_data(dict(i=i))",
284 " time.sleep(0.1)",
284 " time.sleep(0.1)",
285 ]), block=False)
285 ]), block=False)
286 found = set()
286 found = set()
287 tic = time.time()
287 tic = time.time()
288 # timeout after 10s
288 # timeout after 10s
289 while time.time() <= tic + 10:
289 while time.time() <= tic + 10:
290 if ar.data:
290 if ar.data:
291 i = ar.data['i']
291 i = ar.data['i']
292 found.add(i)
292 found.add(i)
293 if i == 4:
293 if i == 4:
294 break
294 break
295 time.sleep(0.05)
295 time.sleep(0.05)
296
296
297 ar.get(5)
297 ar.get(5)
298 nt.assert_in(4, found)
298 nt.assert_in(4, found)
299 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
299 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
300
300
301 def test_not_single_result(self):
301 def test_not_single_result(self):
302 save_build = self.client._build_targets
302 save_build = self.client._build_targets
303 def single_engine(*a, **kw):
303 def single_engine(*a, **kw):
304 idents, targets = save_build(*a, **kw)
304 idents, targets = save_build(*a, **kw)
305 return idents[:1], targets[:1]
305 return idents[:1], targets[:1]
306 ids = single_engine('all')[1]
306 ids = single_engine('all')[1]
307 self.client._build_targets = single_engine
307 self.client._build_targets = single_engine
308 for targets in ('all', None, ids):
308 for targets in ('all', None, ids):
309 dv = self.client.direct_view(targets=targets)
309 dv = self.client.direct_view(targets=targets)
310 ar = dv.apply_async(lambda : 5)
310 ar = dv.apply_async(lambda : 5)
311 self.assertEqual(ar.get(10), [5])
311 self.assertEqual(ar.get(10), [5])
312 self.client._build_targets = save_build
312 self.client._build_targets = save_build
313
313
314 def test_owner_pop(self):
314 def test_owner_pop(self):
315 self.minimum_engines(1)
315 self.minimum_engines(1)
316
316
317 view = self.client[-1]
317 view = self.client[-1]
318 ar = view.apply_async(lambda : 1)
318 ar = view.apply_async(lambda : 1)
319 ar.get()
319 ar.get()
320 msg_id = ar.msg_ids[0]
320 msg_id = ar.msg_ids[0]
321 self.assertNotIn(msg_id, self.client.results)
321 self.assertNotIn(msg_id, self.client.results)
322 self.assertNotIn(msg_id, self.client.metadata)
322 self.assertNotIn(msg_id, self.client.metadata)
323
323
324 def test_non_owner(self):
324 def test_non_owner(self):
325 self.minimum_engines(1)
325 self.minimum_engines(1)
326
326
327 view = self.client[-1]
327 view = self.client[-1]
328 ar = view.apply_async(lambda : 1)
328 ar = view.apply_async(lambda : 1)
329 ar.owner = False
329 ar.owner = False
330 ar.get()
330 ar.get()
331 msg_id = ar.msg_ids[0]
331 msg_id = ar.msg_ids[0]
332 self.assertIn(msg_id, self.client.results)
332 self.assertIn(msg_id, self.client.results)
333 self.assertIn(msg_id, self.client.metadata)
333 self.assertIn(msg_id, self.client.metadata)
334 ar2 = self.client.get_result(msg_id, owner=True)
334 ar2 = self.client.get_result(msg_id, owner=True)
335 self.assertIs(type(ar2), type(ar))
335 self.assertIs(type(ar2), type(ar))
336 self.assertTrue(ar2.owner)
336 self.assertTrue(ar2.owner)
337 self.assertEqual(ar.get(), ar2.get())
337 self.assertEqual(ar.get(), ar2.get())
338 ar2.get()
338 ar2.get()
339 self.assertNotIn(msg_id, self.client.results)
339 self.assertNotIn(msg_id, self.client.results)
340 self.assertNotIn(msg_id, self.client.metadata)
340 self.assertNotIn(msg_id, self.client.metadata)
341
341
342
342
General Comments 0
You need to be logged in to leave comments. Login now