##// END OF EJS Templates
Separate IPython kernel implementation from kernel machinery
Thomas Kluyver -
Show More
@@ -1,4 +1,286 b''
1 """The IPython kernel implementation"""
2
3 import getpass
4 import sys
5 import traceback
6
7 from IPython.core import release
8 from IPython.utils.py3compat import builtin_mod, PY3
9 from IPython.utils.tokenutil import token_at_cursor
10 from IPython.utils.traitlets import Instance, Type, Any
11
1 12 from .kernelbase import KernelBase
13 from .serialize import serialize_object, unpack_apply_message
14 from .zmqshell import ZMQInteractiveShell
2 15
3 16 class Kernel(KernelBase):
4 pass No newline at end of file
17 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
18 shell_class = Type(ZMQInteractiveShell)
19
20 user_module = Any()
21 def _user_module_changed(self, name, old, new):
22 if self.shell is not None:
23 self.shell.user_module = new
24
25 user_ns = Instance(dict, args=None, allow_none=True)
26 def _user_ns_changed(self, name, old, new):
27 if self.shell is not None:
28 self.shell.user_ns = new
29 self.shell.init_user_ns()
30
31 # A reference to the Python builtin 'raw_input' function.
32 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
33 _sys_raw_input = Any()
34 _sys_eval_input = Any()
35
36 def __init__(self, **kwargs):
37 super(Kernel, self).__init__(**kwargs)
38
39 # Initialize the InteractiveShell subclass
40 self.shell = self.shell_class.instance(parent=self,
41 profile_dir = self.profile_dir,
42 user_module = self.user_module,
43 user_ns = self.user_ns,
44 kernel = self,
45 )
46 self.shell.displayhook.session = self.session
47 self.shell.displayhook.pub_socket = self.iopub_socket
48 self.shell.displayhook.topic = self._topic('execute_result')
49 self.shell.display_pub.session = self.session
50 self.shell.display_pub.pub_socket = self.iopub_socket
51 self.shell.data_pub.session = self.session
52 self.shell.data_pub.pub_socket = self.iopub_socket
53
54 # TMP - hack while developing
55 self.shell._reply_content = None
56
57 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
58 comm_manager = self.shell.comm_manager
59 for msg_type in comm_msg_types:
60 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
61
62 # Kernel info fields
63 implementation = 'ipython'
64 implementation_version = release.version
65 language = 'python'
66 language_version = sys.version.split()[0]
67 @property
68 def banner(self):
69 return self.shell.banner
70
71 def set_parent(self, ident, parent):
72 """Overridden from parent to tell the display hook and output streams
73 about the parent message.
74 """
75 super(Kernel, self).set_parent(ident, parent)
76 self.shell.set_parent(parent)
77
78 def _forward_input(self, allow_stdin=False):
79 """Forward raw_input and getpass to the current frontend.
80
81 via input_request
82 """
83 self._allow_stdin = allow_stdin
84
85 if PY3:
86 self._sys_raw_input = builtin_mod.input
87 builtin_mod.input = self.raw_input
88 else:
89 self._sys_raw_input = builtin_mod.raw_input
90 self._sys_eval_input = builtin_mod.input
91 builtin_mod.raw_input = self.raw_input
92 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
93 self._save_getpass = getpass.getpass
94 getpass.getpass = self.getpass
95
96 def _restore_input(self):
97 """Restore raw_input, getpass"""
98 if PY3:
99 builtin_mod.input = self._sys_raw_input
100 else:
101 builtin_mod.raw_input = self._sys_raw_input
102 builtin_mod.input = self._sys_eval_input
103
104 getpass.getpass = self._save_getpass
105
106 @property
107 def execution_count(self):
108 return self.shell.execution_count
109
110 @execution_count.setter
111 def execution_count(self, value):
112 # Ignore the incrememnting done by KernelBase, in favour of our shell's
113 # execution counter.
114 pass
115
116 def do_execute(self, code, silent, store_history=True,
117 user_expressions=None, allow_stdin=False):
118 shell = self.shell # we'll need this a lot here
119
120 self._forward_input(allow_stdin)
121
122 reply_content = {}
123 # FIXME: the shell calls the exception handler itself.
124 shell._reply_content = None
125 try:
126 shell.run_cell(code, store_history=store_history, silent=silent)
127 except:
128 status = u'error'
129 # FIXME: this code right now isn't being used yet by default,
130 # because the run_cell() call above directly fires off exception
131 # reporting. This code, therefore, is only active in the scenario
132 # where runlines itself has an unhandled exception. We need to
133 # uniformize this, for all exception construction to come from a
134 # single location in the codbase.
135 etype, evalue, tb = sys.exc_info()
136 tb_list = traceback.format_exception(etype, evalue, tb)
137 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
138 else:
139 status = u'ok'
140 finally:
141 self._restore_input()
142
143 reply_content[u'status'] = status
144
145 # Return the execution counter so clients can display prompts
146 reply_content['execution_count'] = shell.execution_count - 1
147
148 # FIXME - fish exception info out of shell, possibly left there by
149 # runlines. We'll need to clean up this logic later.
150 if shell._reply_content is not None:
151 reply_content.update(shell._reply_content)
152 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
153 reply_content['engine_info'] = e_info
154 # reset after use
155 shell._reply_content = None
156
157 if 'traceback' in reply_content:
158 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
159
160
161 # At this point, we can tell whether the main code execution succeeded
162 # or not. If it did, we proceed to evaluate user_expressions
163 if reply_content['status'] == 'ok':
164 reply_content[u'user_expressions'] = \
165 shell.user_expressions(user_expressions or {})
166 else:
167 # If there was an error, don't even try to compute expressions
168 reply_content[u'user_expressions'] = {}
169
170 # Payloads should be retrieved regardless of outcome, so we can both
171 # recover partial output (that could have been generated early in a
172 # block, before an error) and clear the payload system always.
173 reply_content[u'payload'] = shell.payload_manager.read_payload()
174 # Be agressive about clearing the payload because we don't want
175 # it to sit in memory until the next execute_request comes in.
176 shell.payload_manager.clear_payload()
177
178 return reply_content
179
180 def do_complete(self, code, cursor_pos):
181 txt, matches = self.shell.complete('', code, cursor_pos)
182 return {'matches' : matches,
183 'cursor_end' : cursor_pos,
184 'cursor_start' : cursor_pos - len(txt),
185 'metadata' : {},
186 'status' : 'ok'}
187
188 def do_inspect(self, code, cursor_pos, detail_level=0):
189 name = token_at_cursor(code, cursor_pos)
190 info = self.shell.object_inspect(name)
191
192 reply_content = {'status' : 'ok'}
193 reply_content['data'] = data = {}
194 reply_content['metadata'] = {}
195 reply_content['found'] = info['found']
196 if info['found']:
197 info_text = self.shell.object_inspect_text(
198 name,
199 detail_level=detail_level,
200 )
201 data['text/plain'] = info_text
202
203 return reply_content
204
205 def do_history(self, hist_access_type, output, raw, session=None, start=None,
206 stop=None, n=None, pattern=None, unique=False):
207 if hist_access_type == 'tail':
208 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
209 include_latest=True)
210
211 elif hist_access_type == 'range':
212 hist = self.shell.history_manager.get_range(session, start, stop,
213 raw=raw, output=output)
214
215 elif hist_access_type == 'search':
216 hist = self.shell.history_manager.search(
217 pattern, raw=raw, output=output, n=n, unique=unique)
218 else:
219 hist = []
220
221 return {'history' : list(hist)}
222
223 def do_shutdown(self, restart):
224 self.shell.exit_now = True
225 return dict(status='ok', restart=restart)
226
227 def do_apply(self, content, bufs, msg_id, reply_metadata):
228 shell = self.shell
229 try:
230 working = shell.user_ns
231
232 prefix = "_"+str(msg_id).replace("-","")+"_"
233
234 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
235
236 fname = getattr(f, '__name__', 'f')
237
238 fname = prefix+"f"
239 argname = prefix+"args"
240 kwargname = prefix+"kwargs"
241 resultname = prefix+"result"
242
243 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
244 # print ns
245 working.update(ns)
246 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
247 try:
248 exec(code, shell.user_global_ns, shell.user_ns)
249 result = working.get(resultname)
250 finally:
251 for key in ns:
252 working.pop(key)
253
254 result_buf = serialize_object(result,
255 buffer_threshold=self.session.buffer_threshold,
256 item_threshold=self.session.item_threshold,
257 )
258
259 except:
260 # invoke IPython traceback formatting
261 shell.showtraceback()
262 # FIXME - fish exception info out of shell, possibly left there by
263 # run_code. We'll need to clean up this logic later.
264 reply_content = {}
265 if shell._reply_content is not None:
266 reply_content.update(shell._reply_content)
267 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
268 reply_content['engine_info'] = e_info
269 # reset after use
270 shell._reply_content = None
271
272 self.send_response(self.iopub_socket, u'error', reply_content,
273 ident=self._topic('error'))
274 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
275 result_buf = []
276
277 if reply_content['ename'] == 'UnmetDependency':
278 reply_metadata['dependencies_met'] = False
279 else:
280 reply_content = {'status' : 'ok'}
281
282 return reply_content, result_buf
283
284 def do_clear(self):
285 self.shell.reset(False)
286 return dict(status='ok')
@@ -1,854 +1,646 b''
1 """An interactive kernel that talks to frontends over 0MQ."""
1 """Base class for a kernel that talks to frontends over 0MQ."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import print_function
7 7
8 import getpass
9 8 import sys
10 9 import time
11 import traceback
12 10 import logging
13 11 import uuid
14 12
15 13 from datetime import datetime
16 14 from signal import (
17 15 signal, default_int_handler, SIGINT
18 16 )
19 17
20 18 import zmq
21 19 from zmq.eventloop import ioloop
22 20 from zmq.eventloop.zmqstream import ZMQStream
23 21
24 22 from IPython.config.configurable import Configurable
25 23 from IPython.core.error import StdinNotImplementedError
26 24 from IPython.core import release
27 25 from IPython.utils import py3compat
28 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
26 from IPython.utils.py3compat import unicode_type, string_types
29 27 from IPython.utils.jsonutil import json_clean
30 from IPython.utils.tokenutil import token_at_cursor
31 28 from IPython.utils.traitlets import (
32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
33 Type, Bool,
29 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
34 30 )
35 31
36 from .serialize import serialize_object, unpack_apply_message
37 32 from .session import Session
38 from .zmqshell import ZMQInteractiveShell
39 33
40 34
41 35 #-----------------------------------------------------------------------------
42 36 # Main kernel class
43 37 #-----------------------------------------------------------------------------
44 38
45 protocol_version = release.kernel_protocol_version
46 ipython_version = release.version
47 language_version = sys.version.split()[0]
48
49
50 39 class KernelBase(Configurable):
51 40
52 41 #---------------------------------------------------------------------------
53 42 # Kernel interface
54 43 #---------------------------------------------------------------------------
55 44
56 45 # attribute to override with a GUI
57 46 eventloop = Any(None)
58 47 def _eventloop_changed(self, name, old, new):
59 48 """schedule call to eventloop from IOLoop"""
60 49 loop = ioloop.IOLoop.instance()
61 50 loop.add_callback(self.enter_eventloop)
62 51
63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
64 shell_class = Type(ZMQInteractiveShell)
65
66 52 session = Instance(Session)
67 53 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
68 54 shell_streams = List()
69 55 control_stream = Instance(ZMQStream)
70 56 iopub_socket = Instance(zmq.Socket)
71 57 stdin_socket = Instance(zmq.Socket)
72 58 log = Instance(logging.Logger)
73
74 user_module = Any()
75 def _user_module_changed(self, name, old, new):
76 if self.shell is not None:
77 self.shell.user_module = new
78
79 user_ns = Instance(dict, args=None, allow_none=True)
80 def _user_ns_changed(self, name, old, new):
81 if self.shell is not None:
82 self.shell.user_ns = new
83 self.shell.init_user_ns()
84 59
85 60 # identities:
86 61 int_id = Integer(-1)
87 62 ident = Unicode()
88 63
89 64 def _ident_default(self):
90 65 return unicode_type(uuid.uuid4())
91 66
92 67 # Private interface
93 68
94 69 _darwin_app_nap = Bool(True, config=True,
95 70 help="""Whether to use appnope for compatiblity with OS X App Nap.
96 71
97 72 Only affects OS X >= 10.9.
98 73 """
99 74 )
100 75
101 76 # track associations with current request
102 77 _allow_stdin = Bool(False)
103 78 _parent_header = Dict()
104 79 _parent_ident = Any(b'')
105 80 # Time to sleep after flushing the stdout/err buffers in each execute
106 81 # cycle. While this introduces a hard limit on the minimal latency of the
107 82 # execute cycle, it helps prevent output synchronization problems for
108 83 # clients.
109 84 # Units are in seconds. The minimum zmq latency on local host is probably
110 85 # ~150 microseconds, set this to 500us for now. We may need to increase it
111 86 # a little if it's not enough after more interactive testing.
112 87 _execute_sleep = Float(0.0005, config=True)
113 88
114 89 # Frequency of the kernel's event loop.
115 90 # Units are in seconds, kernel subclasses for GUI toolkits may need to
116 91 # adapt to milliseconds.
117 92 _poll_interval = Float(0.05, config=True)
118 93
119 94 # If the shutdown was requested over the network, we leave here the
120 95 # necessary reply message so it can be sent by our registered atexit
121 96 # handler. This ensures that the reply is only sent to clients truly at
122 97 # the end of our shutdown process (which happens after the underlying
123 98 # IPython shell's own shutdown).
124 99 _shutdown_message = None
125 100
126 101 # This is a dict of port number that the kernel is listening on. It is set
127 102 # by record_ports and used by connect_request.
128 103 _recorded_ports = Dict()
129 104
130 # A reference to the Python builtin 'raw_input' function.
131 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
132 _sys_raw_input = Any()
133 _sys_eval_input = Any()
134
135 105 # set of aborted msg_ids
136 106 aborted = Set()
137 107
108 # Track execution count here. For IPython, we override this to use the
109 # execution count we store in the shell.
110 execution_count = 0
111
138 112
139 113 def __init__(self, **kwargs):
140 114 super(KernelBase, self).__init__(**kwargs)
141 115
142 # Initialize the InteractiveShell subclass
143 self.shell = self.shell_class.instance(parent=self,
144 profile_dir = self.profile_dir,
145 user_module = self.user_module,
146 user_ns = self.user_ns,
147 kernel = self,
148 )
149 self.shell.displayhook.session = self.session
150 self.shell.displayhook.pub_socket = self.iopub_socket
151 self.shell.displayhook.topic = self._topic('execute_result')
152 self.shell.display_pub.session = self.session
153 self.shell.display_pub.pub_socket = self.iopub_socket
154 self.shell.data_pub.session = self.session
155 self.shell.data_pub.pub_socket = self.iopub_socket
156
157 # TMP - hack while developing
158 self.shell._reply_content = None
159
160 116 # Build dict of handlers for message types
161 117 msg_types = [ 'execute_request', 'complete_request',
162 118 'inspect_request', 'history_request',
163 119 'kernel_info_request',
164 120 'connect_request', 'shutdown_request',
165 121 'apply_request',
166 122 ]
167 123 self.shell_handlers = {}
168 124 for msg_type in msg_types:
169 125 self.shell_handlers[msg_type] = getattr(self, msg_type)
170 126
171 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
172 comm_manager = self.shell.comm_manager
173 for msg_type in comm_msg_types:
174 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
175
176 127 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
177 128 self.control_handlers = {}
178 129 for msg_type in control_msg_types:
179 130 self.control_handlers[msg_type] = getattr(self, msg_type)
180 131
181 132
182 133 def dispatch_control(self, msg):
183 134 """dispatch control requests"""
184 135 idents,msg = self.session.feed_identities(msg, copy=False)
185 136 try:
186 137 msg = self.session.unserialize(msg, content=True, copy=False)
187 138 except:
188 139 self.log.error("Invalid Control Message", exc_info=True)
189 140 return
190 141
191 142 self.log.debug("Control received: %s", msg)
192 143
193 144 header = msg['header']
194 msg_id = header['msg_id']
195 145 msg_type = header['msg_type']
196 146
197 147 handler = self.control_handlers.get(msg_type, None)
198 148 if handler is None:
199 149 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
200 150 else:
201 151 try:
202 152 handler(self.control_stream, idents, msg)
203 153 except Exception:
204 154 self.log.error("Exception in control handler:", exc_info=True)
205 155
206 156 def dispatch_shell(self, stream, msg):
207 157 """dispatch shell requests"""
208 158 # flush control requests first
209 159 if self.control_stream:
210 160 self.control_stream.flush()
211 161
212 162 idents,msg = self.session.feed_identities(msg, copy=False)
213 163 try:
214 164 msg = self.session.unserialize(msg, content=True, copy=False)
215 165 except:
216 166 self.log.error("Invalid Message", exc_info=True)
217 167 return
218 168
219 169 header = msg['header']
220 170 msg_id = header['msg_id']
221 171 msg_type = msg['header']['msg_type']
222 172
223 173 # Print some info about this message and leave a '--->' marker, so it's
224 174 # easier to trace visually the message chain when debugging. Each
225 175 # handler prints its message at the end.
226 176 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
227 177 self.log.debug(' Content: %s\n --->\n ', msg['content'])
228 178
229 179 if msg_id in self.aborted:
230 180 self.aborted.remove(msg_id)
231 181 # is it safe to assume a msg_id will not be resubmitted?
232 182 reply_type = msg_type.split('_')[0] + '_reply'
233 183 status = {'status' : 'aborted'}
234 184 md = {'engine' : self.ident}
235 185 md.update(status)
236 reply_msg = self.session.send(stream, reply_type, metadata=md,
186 self.session.send(stream, reply_type, metadata=md,
237 187 content=status, parent=msg, ident=idents)
238 188 return
239 189
240 190 handler = self.shell_handlers.get(msg_type, None)
241 191 if handler is None:
242 192 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
243 193 else:
244 194 # ensure default_int_handler during handler call
245 195 sig = signal(SIGINT, default_int_handler)
246 196 self.log.debug("%s: %s", msg_type, msg)
247 197 try:
248 198 handler(stream, idents, msg)
249 199 except Exception:
250 200 self.log.error("Exception in message handler:", exc_info=True)
251 201 finally:
252 202 signal(SIGINT, sig)
253 203
254 204 def enter_eventloop(self):
255 205 """enter eventloop"""
256 206 self.log.info("entering eventloop %s", self.eventloop)
257 207 for stream in self.shell_streams:
258 208 # flush any pending replies,
259 209 # which may be skipped by entering the eventloop
260 210 stream.flush(zmq.POLLOUT)
261 211 # restore default_int_handler
262 212 signal(SIGINT, default_int_handler)
263 213 while self.eventloop is not None:
264 214 try:
265 215 self.eventloop(self)
266 216 except KeyboardInterrupt:
267 217 # Ctrl-C shouldn't crash the kernel
268 218 self.log.error("KeyboardInterrupt caught in kernel")
269 219 continue
270 220 else:
271 221 # eventloop exited cleanly, this means we should stop (right?)
272 222 self.eventloop = None
273 223 break
274 224 self.log.info("exiting eventloop")
275 225
276 226 def start(self):
277 227 """register dispatchers for streams"""
278 228 self.shell.exit_now = False
279 229 if self.control_stream:
280 230 self.control_stream.on_recv(self.dispatch_control, copy=False)
281 231
282 232 def make_dispatcher(stream):
283 233 def dispatcher(msg):
284 234 return self.dispatch_shell(stream, msg)
285 235 return dispatcher
286 236
287 237 for s in self.shell_streams:
288 238 s.on_recv(make_dispatcher(s), copy=False)
289 239
290 240 # publish idle status
291 241 self._publish_status('starting')
292 242
293 243 def do_one_iteration(self):
294 244 """step eventloop just once"""
295 245 if self.control_stream:
296 246 self.control_stream.flush()
297 247 for stream in self.shell_streams:
298 248 # handle at most one request per iteration
299 249 stream.flush(zmq.POLLIN, 1)
300 250 stream.flush(zmq.POLLOUT)
301 251
302 252
303 253 def record_ports(self, ports):
304 254 """Record the ports that this kernel is using.
305 255
306 256 The creator of the Kernel instance must call this methods if they
307 257 want the :meth:`connect_request` method to return the port numbers.
308 258 """
309 259 self._recorded_ports = ports
310 260
311 261 #---------------------------------------------------------------------------
312 262 # Kernel request handlers
313 263 #---------------------------------------------------------------------------
314 264
315 265 def _make_metadata(self, other=None):
316 266 """init metadata dict, for execute/apply_reply"""
317 267 new_md = {
318 268 'dependencies_met' : True,
319 269 'engine' : self.ident,
320 270 'started': datetime.now(),
321 271 }
322 272 if other:
323 273 new_md.update(other)
324 274 return new_md
325 275
326 276 def _publish_execute_input(self, code, parent, execution_count):
327 277 """Publish the code request on the iopub stream."""
328 278
329 279 self.session.send(self.iopub_socket, u'execute_input',
330 280 {u'code':code, u'execution_count': execution_count},
331 281 parent=parent, ident=self._topic('execute_input')
332 282 )
333 283
334 284 def _publish_status(self, status, parent=None):
335 285 """send status (busy/idle) on IOPub"""
336 286 self.session.send(self.iopub_socket,
337 287 u'status',
338 288 {u'execution_state': status},
339 289 parent=parent,
340 290 ident=self._topic('status'),
341 291 )
342 292
343 def _forward_input(self, allow_stdin=False):
344 """Forward raw_input and getpass to the current frontend.
345
346 via input_request
347 """
348 self._allow_stdin = allow_stdin
349
350 if py3compat.PY3:
351 self._sys_raw_input = builtin_mod.input
352 builtin_mod.input = self.raw_input
353 else:
354 self._sys_raw_input = builtin_mod.raw_input
355 self._sys_eval_input = builtin_mod.input
356 builtin_mod.raw_input = self.raw_input
357 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
358 self._save_getpass = getpass.getpass
359 getpass.getpass = self.getpass
360
361 def _restore_input(self):
362 """Restore raw_input, getpass"""
363 if py3compat.PY3:
364 builtin_mod.input = self._sys_raw_input
365 else:
366 builtin_mod.raw_input = self._sys_raw_input
367 builtin_mod.input = self._sys_eval_input
368
369 getpass.getpass = self._save_getpass
370
371 293 def set_parent(self, ident, parent):
372 294 """Set the current parent_header
373 295
374 296 Side effects (IOPub messages) and replies are associated with
375 297 the request that caused them via the parent_header.
376 298
377 299 The parent identity is used to route input_request messages
378 300 on the stdin channel.
379 301 """
380 302 self._parent_ident = ident
381 303 self._parent_header = parent
382 self.shell.set_parent(parent)
304
305 def send_response(self, stream, msg_or_type, content=None, ident=None,
306 buffers=None, track=False, header=None, metadata=None):
307 """Send a response to the message we're currently processing.
308
309 This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send`
310 except ``parent``.
311
312 This relies on :meth:`set_parent` having been called for the current
313 message.
314 """
315 return self.session.send(stream, msg_or_type, content, self._parent_header,
316 ident, buffers, track, header, metadata)
383 317
384 318 def execute_request(self, stream, ident, parent):
385 319 """handle an execute_request"""
386 320
387 321 self._publish_status(u'busy', parent)
388 322
389 323 try:
390 324 content = parent[u'content']
391 325 code = py3compat.cast_unicode_py2(content[u'code'])
392 326 silent = content[u'silent']
393 327 store_history = content.get(u'store_history', not silent)
328 user_expressions = content.get('user_expressions', {})
329 allow_stdin = content.get('allow_stdin', False)
394 330 except:
395 331 self.log.error("Got bad msg: ")
396 332 self.log.error("%s", parent)
397 333 return
398 334
399 335 md = self._make_metadata(parent['metadata'])
400 336
401 shell = self.shell # we'll need this a lot here
402
403 self._forward_input(content.get('allow_stdin', False))
404 337 # Set the parent message of the display hook and out streams.
405 338 self.set_parent(ident, parent)
406 339
407 340 # Re-broadcast our input for the benefit of listening clients, and
408 341 # start computing output
409 342 if not silent:
410 self._publish_execute_input(code, parent, shell.execution_count)
411
412 reply_content = {}
413 # FIXME: the shell calls the exception handler itself.
414 shell._reply_content = None
415 try:
416 shell.run_cell(code, store_history=store_history, silent=silent)
417 except:
418 status = u'error'
419 # FIXME: this code right now isn't being used yet by default,
420 # because the run_cell() call above directly fires off exception
421 # reporting. This code, therefore, is only active in the scenario
422 # where runlines itself has an unhandled exception. We need to
423 # uniformize this, for all exception construction to come from a
424 # single location in the codbase.
425 etype, evalue, tb = sys.exc_info()
426 tb_list = traceback.format_exception(etype, evalue, tb)
427 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
428 else:
429 status = u'ok'
430 finally:
431 self._restore_input()
432
433 reply_content[u'status'] = status
434
435 # Return the execution counter so clients can display prompts
436 reply_content['execution_count'] = shell.execution_count - 1
437
438 # FIXME - fish exception info out of shell, possibly left there by
439 # runlines. We'll need to clean up this logic later.
440 if shell._reply_content is not None:
441 reply_content.update(shell._reply_content)
442 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
443 reply_content['engine_info'] = e_info
444 # reset after use
445 shell._reply_content = None
446
447 if 'traceback' in reply_content:
448 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
343 self.execution_count += 1
344 self._publish_execute_input(code, parent, self.execution_count)
449 345
450
451 # At this point, we can tell whether the main code execution succeeded
452 # or not. If it did, we proceed to evaluate user_expressions
453 if reply_content['status'] == 'ok':
454 reply_content[u'user_expressions'] = \
455 shell.user_expressions(content.get(u'user_expressions', {}))
456 else:
457 # If there was an error, don't even try to compute expressions
458 reply_content[u'user_expressions'] = {}
459
460 # Payloads should be retrieved regardless of outcome, so we can both
461 # recover partial output (that could have been generated early in a
462 # block, before an error) and clear the payload system always.
463 reply_content[u'payload'] = shell.payload_manager.read_payload()
464 # Be agressive about clearing the payload because we don't want
465 # it to sit in memory until the next execute_request comes in.
466 shell.payload_manager.clear_payload()
346 reply_content = self.do_execute(code, silent, store_history,
347 user_expressions, allow_stdin)
467 348
468 349 # Flush output before sending the reply.
469 350 sys.stdout.flush()
470 351 sys.stderr.flush()
471 352 # FIXME: on rare occasions, the flush doesn't seem to make it to the
472 353 # clients... This seems to mitigate the problem, but we definitely need
473 354 # to better understand what's going on.
474 355 if self._execute_sleep:
475 356 time.sleep(self._execute_sleep)
476 357
477 358 # Send the reply.
478 359 reply_content = json_clean(reply_content)
479 360
480 361 md['status'] = reply_content['status']
481 362 if reply_content['status'] == 'error' and \
482 363 reply_content['ename'] == 'UnmetDependency':
483 364 md['dependencies_met'] = False
484 365
485 366 reply_msg = self.session.send(stream, u'execute_reply',
486 367 reply_content, parent, metadata=md,
487 368 ident=ident)
488 369
489 370 self.log.debug("%s", reply_msg)
490 371
491 372 if not silent and reply_msg['content']['status'] == u'error':
492 373 self._abort_queues()
493 374
494 375 self._publish_status(u'idle', parent)
495 376
496 377 def complete_request(self, stream, ident, parent):
497 378 content = parent['content']
498 379 code = content['code']
499 380 cursor_pos = content['cursor_pos']
500 381
501 txt, matches = self.shell.complete('', code, cursor_pos)
502 matches = {'matches' : matches,
503 'cursor_end' : cursor_pos,
504 'cursor_start' : cursor_pos - len(txt),
505 'metadata' : {},
506 'status' : 'ok'}
382 matches = self.do_complete(code, cursor_pos)
507 383 matches = json_clean(matches)
508 384 completion_msg = self.session.send(stream, 'complete_reply',
509 385 matches, parent, ident)
510 386 self.log.debug("%s", completion_msg)
511 387
512 388 def inspect_request(self, stream, ident, parent):
513 389 content = parent['content']
514 390
515 name = token_at_cursor(content['code'], content['cursor_pos'])
516 info = self.shell.object_inspect(name)
517
518 reply_content = {'status' : 'ok'}
519 reply_content['data'] = data = {}
520 reply_content['metadata'] = {}
521 reply_content['found'] = info['found']
522 if info['found']:
523 info_text = self.shell.object_inspect_text(
524 name,
525 detail_level=content.get('detail_level', 0),
526 )
527 reply_content['data']['text/plain'] = info_text
391 reply_content = self.do_inspect(content['code'], content['cursor_pos'],
392 content.get('detail_level', 0))
528 393 # Before we send this object over, we scrub it for JSON usage
529 394 reply_content = json_clean(reply_content)
530 395 msg = self.session.send(stream, 'inspect_reply',
531 396 reply_content, parent, ident)
532 397 self.log.debug("%s", msg)
533 398
534 399 def history_request(self, stream, ident, parent):
535 400 # We need to pull these out, as passing **kwargs doesn't work with
536 401 # unicode keys before Python 2.6.5.
537 hist_access_type = parent['content']['hist_access_type']
538 raw = parent['content']['raw']
539 output = parent['content']['output']
540 if hist_access_type == 'tail':
541 n = parent['content']['n']
542 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
543 include_latest=True)
544
545 elif hist_access_type == 'range':
546 session = parent['content']['session']
547 start = parent['content']['start']
548 stop = parent['content']['stop']
549 hist = self.shell.history_manager.get_range(session, start, stop,
550 raw=raw, output=output)
551
552 elif hist_access_type == 'search':
553 n = parent['content'].get('n')
554 unique = parent['content'].get('unique', False)
555 pattern = parent['content']['pattern']
556 hist = self.shell.history_manager.search(
557 pattern, raw=raw, output=output, n=n, unique=unique)
402 content = parent['content']
558 403
559 else:
560 hist = []
561 hist = list(hist)
562 content = {'history' : hist}
563 content = json_clean(content)
404 reply_content = self.do_history(content['hist_access_type'],
405 content['output'], content['raw'],
406 content.get('session', None),
407 content.get('start', None),
408 content.get('stop', None),
409 content.get('n', None),
410 content.get('pattern', None),
411 content.get('unique', False),
412 )
413
414 reply_content = json_clean(reply_content)
564 415 msg = self.session.send(stream, 'history_reply',
565 content, parent, ident)
566 self.log.debug("Sending history reply with %i entries", len(hist))
416 reply_content, parent, ident)
417 self.log.debug("%s", msg)
567 418
568 419 def connect_request(self, stream, ident, parent):
569 420 if self._recorded_ports is not None:
570 421 content = self._recorded_ports.copy()
571 422 else:
572 423 content = {}
573 424 msg = self.session.send(stream, 'connect_reply',
574 425 content, parent, ident)
575 426 self.log.debug("%s", msg)
576 427
577 def kernel_info_request(self, stream, ident, parent):
578 vinfo = {
579 'protocol_version': protocol_version,
580 'implementation': 'ipython',
581 'implementation_version': ipython_version,
582 'language_version': language_version,
583 'language': 'python',
584 'banner': self.shell.banner,
428 @property
429 def kernel_info(self):
430 return {
431 'protocol_version': release.kernel_protocol_version,
432 'implementation': self.implementation,
433 'implementation_version': self.implementation_version,
434 'language': self.language,
435 'language_version': self.language_version,
436 'banner': self.banner,
585 437 }
438
439 def kernel_info_request(self, stream, ident, parent):
586 440 msg = self.session.send(stream, 'kernel_info_reply',
587 vinfo, parent, ident)
441 self.kernel_info, parent, ident)
588 442 self.log.debug("%s", msg)
589 443
590 444 def shutdown_request(self, stream, ident, parent):
591 self.shell.exit_now = True
592 content = dict(status='ok')
593 content.update(parent['content'])
445 content = self.do_shutdown(parent['content']['restart'])
594 446 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
595 447 # same content, but different msg_id for broadcasting on IOPub
596 448 self._shutdown_message = self.session.msg(u'shutdown_reply',
597 449 content, parent
598 450 )
599 451
600 452 self._at_shutdown()
601 453 # call sys.exit after a short delay
602 454 loop = ioloop.IOLoop.instance()
603 455 loop.add_timeout(time.time()+0.1, loop.stop)
604 456
605 457 #---------------------------------------------------------------------------
606 458 # Engine methods
607 459 #---------------------------------------------------------------------------
608 460
609 461 def apply_request(self, stream, ident, parent):
610 462 try:
611 463 content = parent[u'content']
612 464 bufs = parent[u'buffers']
613 465 msg_id = parent['header']['msg_id']
614 466 except:
615 467 self.log.error("Got bad msg: %s", parent, exc_info=True)
616 468 return
617 469
618 470 self._publish_status(u'busy', parent)
619 471
620 472 # Set the parent message of the display hook and out streams.
621 shell = self.shell
622 shell.set_parent(parent)
473 self.set_parent(ident, parent)
623 474
624 475 md = self._make_metadata(parent['metadata'])
625 try:
626 working = shell.user_ns
627
628 prefix = "_"+str(msg_id).replace("-","")+"_"
629
630 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
631
632 fname = getattr(f, '__name__', 'f')
633 476
634 fname = prefix+"f"
635 argname = prefix+"args"
636 kwargname = prefix+"kwargs"
637 resultname = prefix+"result"
638
639 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
640 # print ns
641 working.update(ns)
642 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
643 try:
644 exec(code, shell.user_global_ns, shell.user_ns)
645 result = working.get(resultname)
646 finally:
647 for key in ns:
648 working.pop(key)
649
650 result_buf = serialize_object(result,
651 buffer_threshold=self.session.buffer_threshold,
652 item_threshold=self.session.item_threshold,
653 )
654
655 except:
656 # invoke IPython traceback formatting
657 shell.showtraceback()
658 # FIXME - fish exception info out of shell, possibly left there by
659 # run_code. We'll need to clean up this logic later.
660 reply_content = {}
661 if shell._reply_content is not None:
662 reply_content.update(shell._reply_content)
663 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
664 reply_content['engine_info'] = e_info
665 # reset after use
666 shell._reply_content = None
667
668 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
669 ident=self._topic('error'))
670 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
671 result_buf = []
672
673 if reply_content['ename'] == 'UnmetDependency':
674 md['dependencies_met'] = False
675 else:
676 reply_content = {'status' : 'ok'}
477 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
677 478
678 479 # put 'ok'/'error' status in header, for scheduler introspection:
679 480 md['status'] = reply_content['status']
680 481
681 482 # flush i/o
682 483 sys.stdout.flush()
683 484 sys.stderr.flush()
684 485
685 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
486 self.session.send(stream, u'apply_reply', reply_content,
686 487 parent=parent, ident=ident,buffers=result_buf, metadata=md)
687 488
688 489 self._publish_status(u'idle', parent)
689 490
690 491 #---------------------------------------------------------------------------
691 492 # Control messages
692 493 #---------------------------------------------------------------------------
693 494
694 495 def abort_request(self, stream, ident, parent):
695 496 """abort a specifig msg by id"""
696 497 msg_ids = parent['content'].get('msg_ids', None)
697 498 if isinstance(msg_ids, string_types):
698 499 msg_ids = [msg_ids]
699 500 if not msg_ids:
700 501 self.abort_queues()
701 502 for mid in msg_ids:
702 503 self.aborted.add(str(mid))
703 504
704 505 content = dict(status='ok')
705 506 reply_msg = self.session.send(stream, 'abort_reply', content=content,
706 507 parent=parent, ident=ident)
707 508 self.log.debug("%s", reply_msg)
708 509
709 510 def clear_request(self, stream, idents, parent):
710 511 """Clear our namespace."""
711 self.shell.reset(False)
712 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
713 content = dict(status='ok'))
714
512 content = self.do_clear()
513 self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
514 content = content)
715 515
716 516 #---------------------------------------------------------------------------
717 517 # Protected interface
718 518 #---------------------------------------------------------------------------
719 519
720 def _wrap_exception(self, method=None):
721 # import here, because _wrap_exception is only used in parallel,
722 # and parallel has higher min pyzmq version
723 from IPython.parallel.error import wrap_exception
724 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
725 content = wrap_exception(e_info)
726 return content
727
728 520 def _topic(self, topic):
729 521 """prefixed topic for IOPub messages"""
730 522 if self.int_id >= 0:
731 523 base = "engine.%i" % self.int_id
732 524 else:
733 525 base = "kernel.%s" % self.ident
734 526
735 527 return py3compat.cast_bytes("%s.%s" % (base, topic))
736 528
737 529 def _abort_queues(self):
738 530 for stream in self.shell_streams:
739 531 if stream:
740 532 self._abort_queue(stream)
741 533
742 534 def _abort_queue(self, stream):
743 535 poller = zmq.Poller()
744 536 poller.register(stream.socket, zmq.POLLIN)
745 537 while True:
746 538 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
747 539 if msg is None:
748 540 return
749 541
750 542 self.log.info("Aborting:")
751 543 self.log.info("%s", msg)
752 544 msg_type = msg['header']['msg_type']
753 545 reply_type = msg_type.split('_')[0] + '_reply'
754 546
755 547 status = {'status' : 'aborted'}
756 548 md = {'engine' : self.ident}
757 549 md.update(status)
758 550 reply_msg = self.session.send(stream, reply_type, metadata=md,
759 551 content=status, parent=msg, ident=idents)
760 552 self.log.debug("%s", reply_msg)
761 553 # We need to wait a bit for requests to come in. This can probably
762 554 # be set shorter for true asynchronous clients.
763 555 poller.poll(50)
764 556
765 557
766 558 def _no_raw_input(self):
767 559 """Raise StdinNotImplentedError if active frontend doesn't support
768 560 stdin."""
769 561 raise StdinNotImplementedError("raw_input was called, but this "
770 562 "frontend does not support stdin.")
771 563
772 564 def getpass(self, prompt=''):
773 565 """Forward getpass to frontends
774 566
775 567 Raises
776 568 ------
777 569 StdinNotImplentedError if active frontend doesn't support stdin.
778 570 """
779 571 if not self._allow_stdin:
780 572 raise StdinNotImplementedError(
781 573 "getpass was called, but this frontend does not support input requests."
782 574 )
783 575 return self._input_request(prompt,
784 576 self._parent_ident,
785 577 self._parent_header,
786 578 password=True,
787 579 )
788 580
789 581 def raw_input(self, prompt=''):
790 582 """Forward raw_input to frontends
791 583
792 584 Raises
793 585 ------
794 586 StdinNotImplentedError if active frontend doesn't support stdin.
795 587 """
796 588 if not self._allow_stdin:
797 589 raise StdinNotImplementedError(
798 590 "raw_input was called, but this frontend does not support input requests."
799 591 )
800 592 return self._input_request(prompt,
801 593 self._parent_ident,
802 594 self._parent_header,
803 595 password=False,
804 596 )
805 597
806 598 def _input_request(self, prompt, ident, parent, password=False):
807 599 # Flush output before making the request.
808 600 sys.stderr.flush()
809 601 sys.stdout.flush()
810 602 # flush the stdin socket, to purge stale replies
811 603 while True:
812 604 try:
813 605 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
814 606 except zmq.ZMQError as e:
815 607 if e.errno == zmq.EAGAIN:
816 608 break
817 609 else:
818 610 raise
819 611
820 612 # Send the input request.
821 613 content = json_clean(dict(prompt=prompt, password=password))
822 614 self.session.send(self.stdin_socket, u'input_request', content, parent,
823 615 ident=ident)
824 616
825 617 # Await a response.
826 618 while True:
827 619 try:
828 620 ident, reply = self.session.recv(self.stdin_socket, 0)
829 621 except Exception:
830 622 self.log.warn("Invalid Message:", exc_info=True)
831 623 except KeyboardInterrupt:
832 624 # re-raise KeyboardInterrupt, to truncate traceback
833 625 raise KeyboardInterrupt
834 626 else:
835 627 break
836 628 try:
837 629 value = py3compat.unicode_to_str(reply['content']['value'])
838 630 except:
839 631 self.log.error("Bad input_reply: %s", parent)
840 632 value = ''
841 633 if value == '\x04':
842 634 # EOF
843 635 raise EOFError
844 636 return value
845 637
846 638 def _at_shutdown(self):
847 639 """Actions taken at shutdown by the kernel, called by python's atexit.
848 640 """
849 641 # io.rprint("Kernel at_shutdown") # dbg
850 642 if self._shutdown_message is not None:
851 643 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
852 644 self.log.debug("%s", self._shutdown_message)
853 645 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
854 646
@@ -1,342 +1,342 b''
1 1 """Tests for asyncresult.py"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import time
7 7
8 8 import nose.tools as nt
9 9
10 10 from IPython.utils.io import capture_output
11 11
12 12 from IPython.parallel.error import TimeoutError
13 13 from IPython.parallel import error, Client
14 14 from IPython.parallel.tests import add_engines
15 15 from .clienttest import ClusterTestCase
16 16 from IPython.utils.py3compat import iteritems
17 17
18 18 def setup():
19 19 add_engines(2, total=True)
20 20
21 21 def wait(n):
22 22 import time
23 23 time.sleep(n)
24 24 return n
25 25
26 26 def echo(x):
27 27 return x
28 28
29 29 class AsyncResultTest(ClusterTestCase):
30 30
31 31 def test_single_result_view(self):
32 32 """various one-target views get the right value for single_result"""
33 33 eid = self.client.ids[-1]
34 34 ar = self.client[eid].apply_async(lambda : 42)
35 35 self.assertEqual(ar.get(), 42)
36 36 ar = self.client[[eid]].apply_async(lambda : 42)
37 37 self.assertEqual(ar.get(), [42])
38 38 ar = self.client[-1:].apply_async(lambda : 42)
39 39 self.assertEqual(ar.get(), [42])
40 40
41 41 def test_get_after_done(self):
42 42 ar = self.client[-1].apply_async(lambda : 42)
43 43 ar.wait()
44 44 self.assertTrue(ar.ready())
45 45 self.assertEqual(ar.get(), 42)
46 46 self.assertEqual(ar.get(), 42)
47 47
48 48 def test_get_before_done(self):
49 49 ar = self.client[-1].apply_async(wait, 0.1)
50 50 self.assertRaises(TimeoutError, ar.get, 0)
51 51 ar.wait(0)
52 52 self.assertFalse(ar.ready())
53 53 self.assertEqual(ar.get(), 0.1)
54 54
55 55 def test_get_after_error(self):
56 56 ar = self.client[-1].apply_async(lambda : 1/0)
57 57 ar.wait(10)
58 58 self.assertRaisesRemote(ZeroDivisionError, ar.get)
59 59 self.assertRaisesRemote(ZeroDivisionError, ar.get)
60 60 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
61 61
62 62 def test_get_dict(self):
63 63 n = len(self.client)
64 64 ar = self.client[:].apply_async(lambda : 5)
65 65 self.assertEqual(ar.get(), [5]*n)
66 66 d = ar.get_dict()
67 67 self.assertEqual(sorted(d.keys()), sorted(self.client.ids))
68 68 for eid,r in iteritems(d):
69 69 self.assertEqual(r, 5)
70 70
71 71 def test_get_dict_single(self):
72 72 view = self.client[-1]
73 73 for v in (list(range(5)), 5, ('abc', 'def'), 'string'):
74 74 ar = view.apply_async(echo, v)
75 75 self.assertEqual(ar.get(), v)
76 76 d = ar.get_dict()
77 77 self.assertEqual(d, {view.targets : v})
78 78
79 79 def test_get_dict_bad(self):
80 80 ar = self.client[:].apply_async(lambda : 5)
81 81 ar2 = self.client[:].apply_async(lambda : 5)
82 82 ar = self.client.get_result(ar.msg_ids + ar2.msg_ids)
83 83 self.assertRaises(ValueError, ar.get_dict)
84 84
85 85 def test_list_amr(self):
86 86 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
87 87 rlist = list(ar)
88 88
89 89 def test_getattr(self):
90 90 ar = self.client[:].apply_async(wait, 0.5)
91 91 self.assertEqual(ar.engine_id, [None] * len(ar))
92 92 self.assertRaises(AttributeError, lambda : ar._foo)
93 93 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
94 94 self.assertRaises(AttributeError, lambda : ar.foo)
95 95 self.assertFalse(hasattr(ar, '__length_hint__'))
96 96 self.assertFalse(hasattr(ar, 'foo'))
97 97 self.assertTrue(hasattr(ar, 'engine_id'))
98 98 ar.get(5)
99 99 self.assertRaises(AttributeError, lambda : ar._foo)
100 100 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
101 101 self.assertRaises(AttributeError, lambda : ar.foo)
102 102 self.assertTrue(isinstance(ar.engine_id, list))
103 103 self.assertEqual(ar.engine_id, ar['engine_id'])
104 104 self.assertFalse(hasattr(ar, '__length_hint__'))
105 105 self.assertFalse(hasattr(ar, 'foo'))
106 106 self.assertTrue(hasattr(ar, 'engine_id'))
107 107
108 108 def test_getitem(self):
109 109 ar = self.client[:].apply_async(wait, 0.5)
110 110 self.assertEqual(ar['engine_id'], [None] * len(ar))
111 111 self.assertRaises(KeyError, lambda : ar['foo'])
112 112 ar.get(5)
113 113 self.assertRaises(KeyError, lambda : ar['foo'])
114 114 self.assertTrue(isinstance(ar['engine_id'], list))
115 115 self.assertEqual(ar.engine_id, ar['engine_id'])
116 116
117 117 def test_single_result(self):
118 118 ar = self.client[-1].apply_async(wait, 0.5)
119 119 self.assertRaises(KeyError, lambda : ar['foo'])
120 120 self.assertEqual(ar['engine_id'], None)
121 121 self.assertTrue(ar.get(5) == 0.5)
122 122 self.assertTrue(isinstance(ar['engine_id'], int))
123 123 self.assertTrue(isinstance(ar.engine_id, int))
124 124 self.assertEqual(ar.engine_id, ar['engine_id'])
125 125
126 126 def test_abort(self):
127 127 e = self.client[-1]
128 128 ar = e.execute('import time; time.sleep(1)', block=False)
129 129 ar2 = e.apply_async(lambda : 2)
130 130 ar2.abort()
131 131 self.assertRaises(error.TaskAborted, ar2.get)
132 132 ar.get()
133 133
134 134 def test_len(self):
135 135 v = self.client.load_balanced_view()
136 136 ar = v.map_async(lambda x: x, list(range(10)))
137 137 self.assertEqual(len(ar), 10)
138 138 ar = v.apply_async(lambda x: x, list(range(10)))
139 139 self.assertEqual(len(ar), 1)
140 140 ar = self.client[:].apply_async(lambda x: x, list(range(10)))
141 141 self.assertEqual(len(ar), len(self.client.ids))
142 142
143 143 def test_wall_time_single(self):
144 144 v = self.client.load_balanced_view()
145 145 ar = v.apply_async(time.sleep, 0.25)
146 146 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
147 147 ar.get(2)
148 148 self.assertTrue(ar.wall_time < 1.)
149 149 self.assertTrue(ar.wall_time > 0.2)
150 150
151 151 def test_wall_time_multi(self):
152 152 self.minimum_engines(4)
153 153 v = self.client[:]
154 154 ar = v.apply_async(time.sleep, 0.25)
155 155 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
156 156 ar.get(2)
157 157 self.assertTrue(ar.wall_time < 1.)
158 158 self.assertTrue(ar.wall_time > 0.2)
159 159
160 160 def test_serial_time_single(self):
161 161 v = self.client.load_balanced_view()
162 162 ar = v.apply_async(time.sleep, 0.25)
163 163 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
164 164 ar.get(2)
165 165 self.assertTrue(ar.serial_time < 1.)
166 166 self.assertTrue(ar.serial_time > 0.2)
167 167
168 168 def test_serial_time_multi(self):
169 169 self.minimum_engines(4)
170 170 v = self.client[:]
171 171 ar = v.apply_async(time.sleep, 0.25)
172 172 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
173 173 ar.get(2)
174 174 self.assertTrue(ar.serial_time < 2.)
175 175 self.assertTrue(ar.serial_time > 0.8)
176 176
177 177 def test_elapsed_single(self):
178 178 v = self.client.load_balanced_view()
179 179 ar = v.apply_async(time.sleep, 0.25)
180 180 while not ar.ready():
181 181 time.sleep(0.01)
182 182 self.assertTrue(ar.elapsed < 1)
183 183 self.assertTrue(ar.elapsed < 1)
184 184 ar.get(2)
185 185
186 186 def test_elapsed_multi(self):
187 187 v = self.client[:]
188 188 ar = v.apply_async(time.sleep, 0.25)
189 189 while not ar.ready():
190 190 time.sleep(0.01)
191 self.assertTrue(ar.elapsed < 1)
192 self.assertTrue(ar.elapsed < 1)
191 self.assertLess(ar.elapsed, 1)
192 self.assertLess(ar.elapsed, 1)
193 193 ar.get(2)
194 194
195 195 def test_hubresult_timestamps(self):
196 196 self.minimum_engines(4)
197 197 v = self.client[:]
198 198 ar = v.apply_async(time.sleep, 0.25)
199 199 ar.get(2)
200 200 rc2 = Client(profile='iptest')
201 201 # must have try/finally to close second Client, otherwise
202 202 # will have dangling sockets causing problems
203 203 try:
204 204 time.sleep(0.25)
205 205 hr = rc2.get_result(ar.msg_ids)
206 206 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
207 207 hr.get(1)
208 208 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
209 209 self.assertEqual(hr.serial_time, ar.serial_time)
210 210 finally:
211 211 rc2.close()
212 212
213 213 def test_display_empty_streams_single(self):
214 214 """empty stdout/err are not displayed (single result)"""
215 215 self.minimum_engines(1)
216 216
217 217 v = self.client[-1]
218 218 ar = v.execute("print (5555)")
219 219 ar.get(5)
220 220 with capture_output() as io:
221 221 ar.display_outputs()
222 222 self.assertEqual(io.stderr, '')
223 223 self.assertEqual('5555\n', io.stdout)
224 224
225 225 ar = v.execute("a=5")
226 226 ar.get(5)
227 227 with capture_output() as io:
228 228 ar.display_outputs()
229 229 self.assertEqual(io.stderr, '')
230 230 self.assertEqual(io.stdout, '')
231 231
232 232 def test_display_empty_streams_type(self):
233 233 """empty stdout/err are not displayed (groupby type)"""
234 234 self.minimum_engines(1)
235 235
236 236 v = self.client[:]
237 237 ar = v.execute("print (5555)")
238 238 ar.get(5)
239 239 with capture_output() as io:
240 240 ar.display_outputs()
241 241 self.assertEqual(io.stderr, '')
242 242 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
243 243 self.assertFalse('\n\n' in io.stdout, io.stdout)
244 244 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
245 245
246 246 ar = v.execute("a=5")
247 247 ar.get(5)
248 248 with capture_output() as io:
249 249 ar.display_outputs()
250 250 self.assertEqual(io.stderr, '')
251 251 self.assertEqual(io.stdout, '')
252 252
253 253 def test_display_empty_streams_engine(self):
254 254 """empty stdout/err are not displayed (groupby engine)"""
255 255 self.minimum_engines(1)
256 256
257 257 v = self.client[:]
258 258 ar = v.execute("print (5555)")
259 259 ar.get(5)
260 260 with capture_output() as io:
261 261 ar.display_outputs('engine')
262 262 self.assertEqual(io.stderr, '')
263 263 self.assertEqual(io.stdout.count('5555'), len(v), io.stdout)
264 264 self.assertFalse('\n\n' in io.stdout, io.stdout)
265 265 self.assertEqual(io.stdout.count('[stdout:'), len(v), io.stdout)
266 266
267 267 ar = v.execute("a=5")
268 268 ar.get(5)
269 269 with capture_output() as io:
270 270 ar.display_outputs('engine')
271 271 self.assertEqual(io.stderr, '')
272 272 self.assertEqual(io.stdout, '')
273 273
274 274 def test_await_data(self):
275 275 """asking for ar.data flushes outputs"""
276 276 self.minimum_engines(1)
277 277
278 278 v = self.client[-1]
279 279 ar = v.execute('\n'.join([
280 280 "import time",
281 281 "from IPython.kernel.zmq.datapub import publish_data",
282 282 "for i in range(5):",
283 283 " publish_data(dict(i=i))",
284 284 " time.sleep(0.1)",
285 285 ]), block=False)
286 286 found = set()
287 287 tic = time.time()
288 288 # timeout after 10s
289 289 while time.time() <= tic + 10:
290 290 if ar.data:
291 291 i = ar.data['i']
292 292 found.add(i)
293 293 if i == 4:
294 294 break
295 295 time.sleep(0.05)
296 296
297 297 ar.get(5)
298 298 nt.assert_in(4, found)
299 299 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
300 300
301 301 def test_not_single_result(self):
302 302 save_build = self.client._build_targets
303 303 def single_engine(*a, **kw):
304 304 idents, targets = save_build(*a, **kw)
305 305 return idents[:1], targets[:1]
306 306 ids = single_engine('all')[1]
307 307 self.client._build_targets = single_engine
308 308 for targets in ('all', None, ids):
309 309 dv = self.client.direct_view(targets=targets)
310 310 ar = dv.apply_async(lambda : 5)
311 311 self.assertEqual(ar.get(10), [5])
312 312 self.client._build_targets = save_build
313 313
314 314 def test_owner_pop(self):
315 315 self.minimum_engines(1)
316 316
317 317 view = self.client[-1]
318 318 ar = view.apply_async(lambda : 1)
319 319 ar.get()
320 320 msg_id = ar.msg_ids[0]
321 321 self.assertNotIn(msg_id, self.client.results)
322 322 self.assertNotIn(msg_id, self.client.metadata)
323 323
324 324 def test_non_owner(self):
325 325 self.minimum_engines(1)
326 326
327 327 view = self.client[-1]
328 328 ar = view.apply_async(lambda : 1)
329 329 ar.owner = False
330 330 ar.get()
331 331 msg_id = ar.msg_ids[0]
332 332 self.assertIn(msg_id, self.client.results)
333 333 self.assertIn(msg_id, self.client.metadata)
334 334 ar2 = self.client.get_result(msg_id, owner=True)
335 335 self.assertIs(type(ar2), type(ar))
336 336 self.assertTrue(ar2.owner)
337 337 self.assertEqual(ar.get(), ar2.get())
338 338 ar2.get()
339 339 self.assertNotIn(msg_id, self.client.results)
340 340 self.assertNotIn(msg_id, self.client.metadata)
341 341
342 342
General Comments 0
You need to be logged in to leave comments. Login now