##// END OF EJS Templates
Merge pull request #1630 from minrk/mergekernel...
Brian E. Granger -
r6905:6cb2026f merge
parent child Browse files
Show More
@@ -0,0 +1,179 b''
1 """serialization utilities for apply messages
2
3 Authors:
4
5 * Min RK
6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 # Standard library imports
19 import logging
20 import os
21 import re
22 import socket
23 import sys
24
25 try:
26 import cPickle
27 pickle = cPickle
28 except:
29 cPickle = None
30 import pickle
31
32
33 # IPython imports
34 from IPython.utils import py3compat
35 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
36 from IPython.utils.newserialized import serialize, unserialize
37
38 if py3compat.PY3:
39 buffer = memoryview
40
41 #-----------------------------------------------------------------------------
42 # Serialization Functions
43 #-----------------------------------------------------------------------------
44
45 def serialize_object(obj, threshold=64e-6):
46 """Serialize an object into a list of sendable buffers.
47
48 Parameters
49 ----------
50
51 obj : object
52 The object to be serialized
53 threshold : float
54 The threshold for not double-pickling the content.
55
56
57 Returns
58 -------
59 ('pmd', [bufs]) :
60 where pmd is the pickled metadata wrapper,
61 bufs is a list of data buffers
62 """
63 databuffers = []
64 if isinstance(obj, (list, tuple)):
65 clist = canSequence(obj)
66 slist = map(serialize, clist)
67 for s in slist:
68 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
69 databuffers.append(s.getData())
70 s.data = None
71 return pickle.dumps(slist,-1), databuffers
72 elif isinstance(obj, dict):
73 sobj = {}
74 for k in sorted(obj.iterkeys()):
75 s = serialize(can(obj[k]))
76 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
77 databuffers.append(s.getData())
78 s.data = None
79 sobj[k] = s
80 return pickle.dumps(sobj,-1),databuffers
81 else:
82 s = serialize(can(obj))
83 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
84 databuffers.append(s.getData())
85 s.data = None
86 return pickle.dumps(s,-1),databuffers
87
88
89 def unserialize_object(bufs):
90 """reconstruct an object serialized by serialize_object from data buffers."""
91 bufs = list(bufs)
92 sobj = pickle.loads(bufs.pop(0))
93 if isinstance(sobj, (list, tuple)):
94 for s in sobj:
95 if s.data is None:
96 s.data = bufs.pop(0)
97 return uncanSequence(map(unserialize, sobj)), bufs
98 elif isinstance(sobj, dict):
99 newobj = {}
100 for k in sorted(sobj.iterkeys()):
101 s = sobj[k]
102 if s.data is None:
103 s.data = bufs.pop(0)
104 newobj[k] = uncan(unserialize(s))
105 return newobj, bufs
106 else:
107 if sobj.data is None:
108 sobj.data = bufs.pop(0)
109 return uncan(unserialize(sobj)), bufs
110
111 def pack_apply_message(f, args, kwargs, threshold=64e-6):
112 """pack up a function, args, and kwargs to be sent over the wire
113 as a series of buffers. Any object whose data is larger than `threshold`
114 will not have their data copied (currently only numpy arrays support zero-copy)"""
115 msg = [pickle.dumps(can(f),-1)]
116 databuffers = [] # for large objects
117 sargs, bufs = serialize_object(args,threshold)
118 msg.append(sargs)
119 databuffers.extend(bufs)
120 skwargs, bufs = serialize_object(kwargs,threshold)
121 msg.append(skwargs)
122 databuffers.extend(bufs)
123 msg.extend(databuffers)
124 return msg
125
126 def unpack_apply_message(bufs, g=None, copy=True):
127 """unpack f,args,kwargs from buffers packed by pack_apply_message()
128 Returns: original f,args,kwargs"""
129 bufs = list(bufs) # allow us to pop
130 assert len(bufs) >= 3, "not enough buffers!"
131 if not copy:
132 for i in range(3):
133 bufs[i] = bufs[i].bytes
134 cf = pickle.loads(bufs.pop(0))
135 sargs = list(pickle.loads(bufs.pop(0)))
136 skwargs = dict(pickle.loads(bufs.pop(0)))
137 # print sargs, skwargs
138 f = uncan(cf, g)
139 for sa in sargs:
140 if sa.data is None:
141 m = bufs.pop(0)
142 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
143 # always use a buffer, until memoryviews get sorted out
144 sa.data = buffer(m)
145 # disable memoryview support
146 # if copy:
147 # sa.data = buffer(m)
148 # else:
149 # sa.data = m.buffer
150 else:
151 if copy:
152 sa.data = m
153 else:
154 sa.data = m.bytes
155
156 args = uncanSequence(map(unserialize, sargs), g)
157 kwargs = {}
158 for k in sorted(skwargs.iterkeys()):
159 sa = skwargs[k]
160 if sa.data is None:
161 m = bufs.pop(0)
162 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
163 # always use a buffer, until memoryviews get sorted out
164 sa.data = buffer(m)
165 # disable memoryview support
166 # if copy:
167 # sa.data = buffer(m)
168 # else:
169 # sa.data = m.buffer
170 else:
171 if copy:
172 sa.data = m
173 else:
174 sa.data = m.bytes
175
176 kwargs[k] = uncan(unserialize(sa), g)
177
178 return f,args,kwargs
179
@@ -133,6 +133,30 b' class Application(SingletonConfigurable):'
133 self.log_level = new
133 self.log_level = new
134 self.log.setLevel(new)
134 self.log.setLevel(new)
135
135
136 log_format = Unicode("[%(name)s] %(message)s", config=True,
137 help="The Logging format template",
138 )
139 log = Instance(logging.Logger)
140 def _log_default(self):
141 """Start logging for this application.
142
143 The default is to log to stdout using a StreaHandler. The log level
144 starts at loggin.WARN, but this can be adjusted by setting the
145 ``log_level`` attribute.
146 """
147 log = logging.getLogger(self.__class__.__name__)
148 log.setLevel(self.log_level)
149 if sys.executable.endswith('pythonw.exe'):
150 # this should really go to a file, but file-logging is only
151 # hooked up in parallel applications
152 _log_handler = logging.StreamHandler(open(os.devnull, 'w'))
153 else:
154 _log_handler = logging.StreamHandler()
155 _log_formatter = logging.Formatter(self.log_format)
156 _log_handler.setFormatter(_log_formatter)
157 log.addHandler(_log_handler)
158 return log
159
136 # the alias map for configurables
160 # the alias map for configurables
137 aliases = Dict({'log-level' : 'Application.log_level'})
161 aliases = Dict({'log-level' : 'Application.log_level'})
138
162
@@ -169,32 +193,11 b' class Application(SingletonConfigurable):'
169 if self.__class__ not in self.classes:
193 if self.__class__ not in self.classes:
170 self.classes.insert(0, self.__class__)
194 self.classes.insert(0, self.__class__)
171
195
172 self.init_logging()
173
174 def _config_changed(self, name, old, new):
196 def _config_changed(self, name, old, new):
175 SingletonConfigurable._config_changed(self, name, old, new)
197 SingletonConfigurable._config_changed(self, name, old, new)
176 self.log.debug('Config changed:')
198 self.log.debug('Config changed:')
177 self.log.debug(repr(new))
199 self.log.debug(repr(new))
178
200
179 def init_logging(self):
180 """Start logging for this application.
181
182 The default is to log to stdout using a StreaHandler. The log level
183 starts at loggin.WARN, but this can be adjusted by setting the
184 ``log_level`` attribute.
185 """
186 self.log = logging.getLogger(self.__class__.__name__)
187 self.log.setLevel(self.log_level)
188 if sys.executable.endswith('pythonw.exe'):
189 # this should really go to a file, but file-logging is only
190 # hooked up in parallel applications
191 self._log_handler = logging.StreamHandler(open(os.devnull, 'w'))
192 else:
193 self._log_handler = logging.StreamHandler()
194 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
195 self._log_handler.setFormatter(self._log_formatter)
196 self.log.addHandler(self._log_handler)
197
198 @catch_config_error
201 @catch_config_error
199 def initialize(self, argv=None):
202 def initialize(self, argv=None):
200 """Do the basic steps to configure me.
203 """Do the basic steps to configure me.
@@ -403,6 +403,7 b' class HistoryManager(HistoryAccessor):'
403 self.save_flag = threading.Event()
403 self.save_flag = threading.Event()
404 self.db_input_cache_lock = threading.Lock()
404 self.db_input_cache_lock = threading.Lock()
405 self.db_output_cache_lock = threading.Lock()
405 self.db_output_cache_lock = threading.Lock()
406 if self.hist_file != ':memory:':
406 self.save_thread = HistorySavingThread(self)
407 self.save_thread = HistorySavingThread(self)
407 self.save_thread.start()
408 self.save_thread.start()
408
409
@@ -712,6 +712,7 b' class InteractiveShell(SingletonConfigurable, Magic):'
712 self._orig_sys_module_state['stderr'] = sys.stderr
712 self._orig_sys_module_state['stderr'] = sys.stderr
713 self._orig_sys_module_state['excepthook'] = sys.excepthook
713 self._orig_sys_module_state['excepthook'] = sys.excepthook
714 self._orig_sys_modules_main_name = self.user_module.__name__
714 self._orig_sys_modules_main_name = self.user_module.__name__
715 self._orig_sys_modules_main_mod = sys.modules.get(self.user_module.__name__)
715
716
716 def restore_sys_module_state(self):
717 def restore_sys_module_state(self):
717 """Restore the state of the sys module."""
718 """Restore the state of the sys module."""
@@ -721,7 +722,8 b' class InteractiveShell(SingletonConfigurable, Magic):'
721 except AttributeError:
722 except AttributeError:
722 pass
723 pass
723 # Reset what what done in self.init_sys_modules
724 # Reset what what done in self.init_sys_modules
724 sys.modules[self.user_module.__name__] = self._orig_sys_modules_main_name
725 if self._orig_sys_modules_main_mod is not None:
726 sys.modules[self._orig_sys_modules_main_name] = self._orig_sys_modules_main_mod
725
727
726 #-------------------------------------------------------------------------
728 #-------------------------------------------------------------------------
727 # Things related to hooks
729 # Things related to hooks
@@ -2424,7 +2426,7 b' class InteractiveShell(SingletonConfigurable, Magic):'
2424 self.showtraceback()
2426 self.showtraceback()
2425 warn('Unknown failure executing module: <%s>' % mod_name)
2427 warn('Unknown failure executing module: <%s>' % mod_name)
2426
2428
2427 def run_cell(self, raw_cell, store_history=False):
2429 def run_cell(self, raw_cell, store_history=False, silent=False):
2428 """Run a complete IPython cell.
2430 """Run a complete IPython cell.
2429
2431
2430 Parameters
2432 Parameters
@@ -2435,10 +2437,16 b' class InteractiveShell(SingletonConfigurable, Magic):'
2435 If True, the raw and translated cell will be stored in IPython's
2437 If True, the raw and translated cell will be stored in IPython's
2436 history. For user code calling back into IPython's machinery, this
2438 history. For user code calling back into IPython's machinery, this
2437 should be set to False.
2439 should be set to False.
2440 silent : bool
2441 If True, avoid side-effets, such as implicit displayhooks, history,
2442 and logging. silent=True forces store_history=False.
2438 """
2443 """
2439 if (not raw_cell) or raw_cell.isspace():
2444 if (not raw_cell) or raw_cell.isspace():
2440 return
2445 return
2441
2446
2447 if silent:
2448 store_history = False
2449
2442 for line in raw_cell.splitlines():
2450 for line in raw_cell.splitlines():
2443 self.input_splitter.push(line)
2451 self.input_splitter.push(line)
2444 cell = self.input_splitter.source_reset()
2452 cell = self.input_splitter.source_reset()
@@ -2462,7 +2470,7 b' class InteractiveShell(SingletonConfigurable, Magic):'
2462 if store_history:
2470 if store_history:
2463 self.history_manager.store_inputs(self.execution_count,
2471 self.history_manager.store_inputs(self.execution_count,
2464 cell, raw_cell)
2472 cell, raw_cell)
2465
2473 if not silent:
2466 self.logger.log(cell, raw_cell)
2474 self.logger.log(cell, raw_cell)
2467
2475
2468 if not prefilter_failed:
2476 if not prefilter_failed:
@@ -2484,11 +2492,15 b' class InteractiveShell(SingletonConfigurable, Magic):'
2484 self.execution_count += 1
2492 self.execution_count += 1
2485 return None
2493 return None
2486
2494
2495 interactivity = "none" if silent else "last_expr"
2487 self.run_ast_nodes(code_ast.body, cell_name,
2496 self.run_ast_nodes(code_ast.body, cell_name,
2488 interactivity="last_expr")
2497 interactivity=interactivity)
2489
2498
2490 # Execute any registered post-execution functions.
2499 # Execute any registered post-execution functions.
2491 for func, status in self._post_execute.iteritems():
2500 # unless we are silent
2501 post_exec = [] if silent else self._post_execute.iteritems()
2502
2503 for func, status in post_exec:
2492 if self.disable_failing_post_execute and not status:
2504 if self.disable_failing_post_execute and not status:
2493 continue
2505 continue
2494 try:
2506 try:
@@ -255,6 +255,61 b' class InteractiveShellTestCase(unittest.TestCase):'
255 # ZeroDivisionError
255 # ZeroDivisionError
256 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
256 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
257
257
258 def test_silent_nopostexec(self):
259 """run_cell(silent=True) doesn't invoke post-exec funcs"""
260 ip = get_ipython()
261
262 d = dict(called=False)
263 def set_called():
264 d['called'] = True
265
266 ip.register_post_execute(set_called)
267 ip.run_cell("1", silent=True)
268 self.assertFalse(d['called'])
269 # double-check that non-silent exec did what we expected
270 # silent to avoid
271 ip.run_cell("1")
272 self.assertTrue(d['called'])
273 # remove post-exec
274 ip._post_execute.pop(set_called)
275
276 def test_silent_noadvance(self):
277 """run_cell(silent=True) doesn't advance execution_count"""
278 ip = get_ipython()
279
280 ec = ip.execution_count
281 # silent should force store_history=False
282 ip.run_cell("1", store_history=True, silent=True)
283
284 self.assertEquals(ec, ip.execution_count)
285 # double-check that non-silent exec did what we expected
286 # silent to avoid
287 ip.run_cell("1", store_history=True)
288 self.assertEquals(ec+1, ip.execution_count)
289
290 def test_silent_nodisplayhook(self):
291 """run_cell(silent=True) doesn't trigger displayhook"""
292 ip = get_ipython()
293
294 d = dict(called=False)
295
296 trap = ip.display_trap
297 save_hook = trap.hook
298
299 def failing_hook(*args, **kwargs):
300 d['called'] = True
301
302 try:
303 trap.hook = failing_hook
304 ip.run_cell("1", silent=True)
305 self.assertFalse(d['called'])
306 # double-check that non-silent exec did what we expected
307 # silent to avoid
308 ip.run_cell("1")
309 self.assertTrue(d['called'])
310 finally:
311 trap.hook = save_hook
312
258 @skipif(sys.version_info[0] >= 3, "softspace removed in py3")
313 @skipif(sys.version_info[0] >= 3, "softspace removed in py3")
259 def test_print_softspace(self):
314 def test_print_softspace(self):
260 """Verify that softspace is handled correctly when executing multiple
315 """Verify that softspace is handled correctly when executing multiple
@@ -211,7 +211,7 b' class ParalleMagic(Plugin):'
211 print '[stdout:%i]'%eid, stdout
211 print '[stdout:%i]'%eid, stdout
212
212
213
213
214 def pxrun_cell(self, raw_cell, store_history=True):
214 def pxrun_cell(self, raw_cell, store_history=False, silent=False):
215 """drop-in replacement for InteractiveShell.run_cell.
215 """drop-in replacement for InteractiveShell.run_cell.
216
216
217 This executes code remotely, instead of in the local namespace.
217 This executes code remotely, instead of in the local namespace.
@@ -258,7 +258,7 b' class ParalleMagic(Plugin):'
258 return False
258 return False
259 else:
259 else:
260 try:
260 try:
261 result = self.active_view.execute(cell, block=False)
261 result = self.active_view.execute(cell, silent=False, block=False)
262 except:
262 except:
263 ipself.showtraceback()
263 ipself.showtraceback()
264 return True
264 return True
@@ -140,8 +140,6 b' class IPythonConsoleApp(Configurable):'
140 frontend_flags = Any(app_flags)
140 frontend_flags = Any(app_flags)
141 frontend_aliases = Any(app_aliases)
141 frontend_aliases = Any(app_aliases)
142
142
143 pure = CBool(False, config=True,
144 help="Use a pure Python kernel instead of an IPython kernel.")
145 # create requested profiles by default, if they don't exist:
143 # create requested profiles by default, if they don't exist:
146 auto_create = CBool(True)
144 auto_create = CBool(True)
147 # connection info:
145 # connection info:
@@ -330,9 +328,7 b' class IPythonConsoleApp(Configurable):'
330 )
328 )
331 # start the kernel
329 # start the kernel
332 if not self.existing:
330 if not self.existing:
333 kwargs = dict(ipython=not self.pure)
331 self.kernel_manager.start_kernel(extra_arguments=self.kernel_argv)
334 kwargs['extra_arguments'] = self.kernel_argv
335 self.kernel_manager.start_kernel(**kwargs)
336 elif self.sshserver:
332 elif self.sshserver:
337 # ssh, write new connection file
333 # ssh, write new connection file
338 self.kernel_manager.write_connection_file()
334 self.kernel_manager.write_connection_file()
@@ -45,8 +45,6 b' class DummyIPClusterStart(IPClusterStart):'
45
45
46 def init_signal(self):
46 def init_signal(self):
47 pass
47 pass
48 def init_logging(self):
49 pass
50 def reinit_logging(self):
48 def reinit_logging(self):
51 pass
49 pass
52
50
@@ -30,6 +30,7 b' from zmq.utils import jsonapi'
30 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
31 from IPython.zmq.session import Session
31 from IPython.zmq.session import Session
32 from IPython.lib.security import passwd_check
32 from IPython.lib.security import passwd_check
33 from IPython.utils.jsonutil import date_default
33
34
34 try:
35 try:
35 from docutils.core import publish_string
36 from docutils.core import publish_string
@@ -385,13 +386,13 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
385 except KeyError:
386 except KeyError:
386 pass
387 pass
387 msg.pop('buffers')
388 msg.pop('buffers')
388 return jsonapi.dumps(msg)
389 return jsonapi.dumps(msg, default=date_default)
389
390
390 def _on_zmq_reply(self, msg_list):
391 def _on_zmq_reply(self, msg_list):
391 try:
392 try:
392 msg = self._reserialize_reply(msg_list)
393 msg = self._reserialize_reply(msg_list)
393 except:
394 except Exception:
394 self.application.log.critical("Malformed message: %r" % msg_list)
395 self.application.log.critical("Malformed message: %r" % msg_list, exc_info=True)
395 else:
396 else:
396 self.write_message(msg)
397 self.write_message(msg)
397
398
@@ -397,7 +397,6 b' class NotebookApp(BaseIPythonApplication):'
397 self.cluster_manager.update_profiles()
397 self.cluster_manager.update_profiles()
398
398
399 def init_logging(self):
399 def init_logging(self):
400 super(NotebookApp, self).init_logging()
401 # This prevents double log messages because tornado use a root logger that
400 # This prevents double log messages because tornado use a root logger that
402 # self.log is a child of. The logging module dipatches log messages to a log
401 # self.log is a child of. The logging module dipatches log messages to a log
403 # and all of its ancenstors until propagate is set to False.
402 # and all of its ancenstors until propagate is set to False.
@@ -500,6 +499,7 b' class NotebookApp(BaseIPythonApplication):'
500
499
501 @catch_config_error
500 @catch_config_error
502 def initialize(self, argv=None):
501 def initialize(self, argv=None):
502 self.init_logging()
503 super(NotebookApp, self).initialize(argv)
503 super(NotebookApp, self).initialize(argv)
504 self.init_configurables()
504 self.init_configurables()
505 self.init_webapp()
505 self.init_webapp()
@@ -101,9 +101,7 b' ipython qtconsole --pylab=inline # start with pylab in inline plotting mode'
101 # start with copy of flags
101 # start with copy of flags
102 flags = dict(flags)
102 flags = dict(flags)
103 qt_flags = {
103 qt_flags = {
104 'pure' : ({'IPythonQtConsoleApp' : {'pure' : True}},
104 'plain' : ({'IPythonQtConsoleApp' : {'plain' : True}},
105 "Use a pure Python kernel instead of an IPython kernel."),
106 'plain' : ({'ConsoleWidget' : {'kind' : 'plain'}},
107 "Disable rich text support."),
105 "Disable rich text support."),
108 }
106 }
109 qt_flags.update(boolean_flag(
107 qt_flags.update(boolean_flag(
@@ -180,18 +178,14 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):'
180 plain = CBool(False, config=True,
178 plain = CBool(False, config=True,
181 help="Use a plaintext widget instead of rich text (plain can't print/save).")
179 help="Use a plaintext widget instead of rich text (plain can't print/save).")
182
180
183 def _pure_changed(self, name, old, new):
181 def _plain_changed(self, name, old, new):
184 kind = 'plain' if self.plain else 'rich'
182 kind = 'plain' if new else 'rich'
185 self.config.ConsoleWidget.kind = kind
183 self.config.ConsoleWidget.kind = kind
186 if self.pure:
184 if new:
187 self.widget_factory = FrontendWidget
188 elif self.plain:
189 self.widget_factory = IPythonWidget
185 self.widget_factory = IPythonWidget
190 else:
186 else:
191 self.widget_factory = RichIPythonWidget
187 self.widget_factory = RichIPythonWidget
192
188
193 _plain_changed = _pure_changed
194
195 # the factory for creating a widget
189 # the factory for creating a widget
196 widget_factory = Any(RichIPythonWidget)
190 widget_factory = Any(RichIPythonWidget)
197
191
@@ -210,7 +204,7 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):'
210 config=self.config,
204 config=self.config,
211 )
205 )
212 # start the kernel
206 # start the kernel
213 kwargs = dict(ipython=not self.pure)
207 kwargs = dict()
214 kwargs['extra_arguments'] = self.kernel_argv
208 kwargs['extra_arguments'] = self.kernel_argv
215 kernel_manager.start_kernel(**kwargs)
209 kernel_manager.start_kernel(**kwargs)
216 kernel_manager.start_channels()
210 kernel_manager.start_channels()
@@ -273,17 +267,13 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):'
273 self.window.add_tab_with_frontend(self.widget)
267 self.window.add_tab_with_frontend(self.widget)
274 self.window.init_menu_bar()
268 self.window.init_menu_bar()
275
269
276 self.window.setWindowTitle('Python' if self.pure else 'IPython')
270 self.window.setWindowTitle('IPython')
277
271
278 def init_colors(self, widget):
272 def init_colors(self, widget):
279 """Configure the coloring of the widget"""
273 """Configure the coloring of the widget"""
280 # Note: This will be dramatically simplified when colors
274 # Note: This will be dramatically simplified when colors
281 # are removed from the backend.
275 # are removed from the backend.
282
276
283 if self.pure:
284 # only IPythonWidget supports styling
285 return
286
287 # parse the colors arg down to current known labels
277 # parse the colors arg down to current known labels
288 try:
278 try:
289 colors = self.config.ZMQInteractiveShell.colors
279 colors = self.config.ZMQInteractiveShell.colors
@@ -46,11 +46,11 b' def get_connection_file(app=None):'
46 If unspecified, the currently running app will be used
46 If unspecified, the currently running app will be used
47 """
47 """
48 if app is None:
48 if app is None:
49 from IPython.zmq.kernelapp import KernelApp
49 from IPython.zmq.ipkernel import IPKernelApp
50 if not KernelApp.initialized():
50 if not IPKernelApp.initialized():
51 raise RuntimeError("app not specified, and not in a running Kernel")
51 raise RuntimeError("app not specified, and not in a running Kernel")
52
52
53 app = KernelApp.instance()
53 app = IPKernelApp.instance()
54 return filefind(app.connection_file, ['.', app.profile_dir.security_dir])
54 return filefind(app.connection_file, ['.', app.profile_dir.security_dir])
55
55
56 def find_connection_file(filename, profile=None):
56 def find_connection_file(filename, profile=None):
@@ -35,6 +35,28 b' from .client.asyncresult import *'
35 from .client.client import Client
35 from .client.client import Client
36 from .client.remotefunction import *
36 from .client.remotefunction import *
37 from .client.view import *
37 from .client.view import *
38 from .util import interactive
38 from .controller.dependency import *
39 from .controller.dependency import *
39
40
41 #-----------------------------------------------------------------------------
42 # Functions
43 #-----------------------------------------------------------------------------
44
45
46 def bind_kernel(**kwargs):
47 """Bind an Engine's Kernel to be used as a full IPython kernel.
48
49 This allows a running Engine to be used simultaneously as a full IPython kernel
50 with the QtConsole or other frontends.
51
52 This function returns immediately.
53 """
54 from IPython.parallel.apps.ipengineapp import IPEngineApp
55 if IPEngineApp.initialized():
56 app = IPEngineApp.instance()
57 else:
58 raise RuntimeError("Must be called from an IPEngineApp instance")
59
60 return app.bind_kernel(**kwargs)
61
40
62
@@ -103,6 +103,10 b' class BaseParallelApplication(BaseIPythonApplication):'
103 # temporarily override default_log_level to INFO
103 # temporarily override default_log_level to INFO
104 return logging.INFO
104 return logging.INFO
105
105
106 def _log_format_default(self):
107 """override default log format to include time"""
108 return u"%(asctime)s.%(msecs).03d [%(name)s] %(message)s"
109
106 work_dir = Unicode(os.getcwdu(), config=True,
110 work_dir = Unicode(os.getcwdu(), config=True,
107 help='Set the working dir for the process.'
111 help='Set the working dir for the process.'
108 )
112 )
@@ -175,11 +179,14 b' class BaseParallelApplication(BaseIPythonApplication):'
175 else:
179 else:
176 open_log_file = None
180 open_log_file = None
177 if open_log_file is not None:
181 if open_log_file is not None:
178 self.log.removeHandler(self._log_handler)
182 while self.log.handlers:
183 self.log.removeHandler(self.log.handlers[0])
179 self._log_handler = logging.StreamHandler(open_log_file)
184 self._log_handler = logging.StreamHandler(open_log_file)
180 self.log.addHandler(self._log_handler)
185 self.log.addHandler(self._log_handler)
186 else:
187 self._log_handler = self.log.handlers[0]
181 # Add timestamps to log format:
188 # Add timestamps to log format:
182 self._log_formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
189 self._log_formatter = logging.Formatter(self.log_format,
183 datefmt="%Y-%m-%d %H:%M:%S")
190 datefmt="%Y-%m-%d %H:%M:%S")
184 self._log_handler.setFormatter(self._log_formatter)
191 self._log_handler.setFormatter(self._log_formatter)
185 # do not propagate log messages to root logger
192 # do not propagate log messages to root logger
@@ -434,11 +434,9 b' class IPControllerApp(BaseParallelApplication):'
434 lsock = context.socket(zmq.PUB)
434 lsock = context.socket(zmq.PUB)
435 lsock.connect(self.log_url)
435 lsock.connect(self.log_url)
436 handler = PUBHandler(lsock)
436 handler = PUBHandler(lsock)
437 self.log.removeHandler(self._log_handler)
438 handler.root_topic = 'controller'
437 handler.root_topic = 'controller'
439 handler.setLevel(self.log_level)
438 handler.setLevel(self.log_level)
440 self.log.addHandler(handler)
439 self.log.addHandler(handler)
441 self._log_handler = handler
442
440
443 @catch_config_error
441 @catch_config_error
444 def initialize(self, argv=None):
442 def initialize(self, argv=None):
@@ -37,6 +37,7 b' from IPython.parallel.apps.baseapp import ('
37 catch_config_error,
37 catch_config_error,
38 )
38 )
39 from IPython.zmq.log import EnginePUBHandler
39 from IPython.zmq.log import EnginePUBHandler
40 from IPython.zmq.ipkernel import Kernel, IPKernelApp
40 from IPython.zmq.session import (
41 from IPython.zmq.session import (
41 Session, session_aliases, session_flags
42 Session, session_aliases, session_flags
42 )
43 )
@@ -44,11 +45,11 b' from IPython.zmq.session import ('
44 from IPython.config.configurable import Configurable
45 from IPython.config.configurable import Configurable
45
46
46 from IPython.parallel.engine.engine import EngineFactory
47 from IPython.parallel.engine.engine import EngineFactory
47 from IPython.parallel.engine.streamkernel import Kernel
48 from IPython.parallel.util import disambiguate_url
48 from IPython.parallel.util import disambiguate_url, asbytes
49
49
50 from IPython.utils.importstring import import_item
50 from IPython.utils.importstring import import_item
51 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
51 from IPython.utils.py3compat import cast_bytes
52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
52
53
53
54
54 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
@@ -174,9 +175,17 b' class IPEngineApp(BaseParallelApplication):'
174 help="""The URL for the iploggerapp instance, for forwarding
175 help="""The URL for the iploggerapp instance, for forwarding
175 logging to a central location.""")
176 logging to a central location.""")
176
177
178 # an IPKernelApp instance, used to setup listening for shell frontends
179 kernel_app = Instance(IPKernelApp)
180
177 aliases = Dict(aliases)
181 aliases = Dict(aliases)
178 flags = Dict(flags)
182 flags = Dict(flags)
179
183
184 @property
185 def kernel(self):
186 """allow access to the Kernel object, so I look like IPKernelApp"""
187 return self.engine.kernel
188
180 def find_url_file(self):
189 def find_url_file(self):
181 """Set the url file.
190 """Set the url file.
182
191
@@ -203,7 +212,7 b' class IPEngineApp(BaseParallelApplication):'
203 d = json.loads(f.read())
212 d = json.loads(f.read())
204
213
205 if 'exec_key' in d:
214 if 'exec_key' in d:
206 config.Session.key = asbytes(d['exec_key'])
215 config.Session.key = cast_bytes(d['exec_key'])
207
216
208 try:
217 try:
209 config.EngineFactory.location
218 config.EngineFactory.location
@@ -221,6 +230,46 b' class IPEngineApp(BaseParallelApplication):'
221 except AttributeError:
230 except AttributeError:
222 config.EngineFactory.sshserver = d['ssh']
231 config.EngineFactory.sshserver = d['ssh']
223
232
233 def bind_kernel(self, **kwargs):
234 """Promote engine to listening kernel, accessible to frontends."""
235 if self.kernel_app is not None:
236 return
237
238 self.log.info("Opening ports for direct connections as an IPython kernel")
239
240 kernel = self.kernel
241
242 kwargs.setdefault('config', self.config)
243 kwargs.setdefault('log', self.log)
244 kwargs.setdefault('profile_dir', self.profile_dir)
245 kwargs.setdefault('session', self.engine.session)
246
247 app = self.kernel_app = IPKernelApp(**kwargs)
248
249 # allow IPKernelApp.instance():
250 IPKernelApp._instance = app
251
252 app.init_connection_file()
253 # relevant contents of init_sockets:
254
255 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
256 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
257
258 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
259 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
260
261 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
262 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
263 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
264
265 # start the heartbeat, and log connection info:
266
267 app.init_heartbeat()
268
269 app.log_connection_info()
270 app.write_connection_file()
271
272
224 def init_engine(self):
273 def init_engine(self):
225 # This is the working dir by now.
274 # This is the working dir by now.
226 sys.path.insert(0, '')
275 sys.path.insert(0, '')
@@ -282,11 +331,9 b' class IPEngineApp(BaseParallelApplication):'
282 context = self.engine.context
331 context = self.engine.context
283 lsock = context.socket(zmq.PUB)
332 lsock = context.socket(zmq.PUB)
284 lsock.connect(self.log_url)
333 lsock.connect(self.log_url)
285 self.log.removeHandler(self._log_handler)
286 handler = EnginePUBHandler(self.engine, lsock)
334 handler = EnginePUBHandler(self.engine, lsock)
287 handler.setLevel(self.log_level)
335 handler.setLevel(self.log_level)
288 self.log.addHandler(handler)
336 self.log.addHandler(handler)
289 self._log_handler = handler
290
337
291 def init_mpi(self):
338 def init_mpi(self):
292 global mpi
339 global mpi
@@ -23,6 +23,7 b' import copy'
23 import logging
23 import logging
24 import os
24 import os
25 import stat
25 import stat
26 import sys
26 import time
27 import time
27
28
28 # signal imports, handling various platforms, versions
29 # signal imports, handling various platforms, versions
@@ -59,8 +60,8 b' from IPython.utils.text import EvalFormatter'
59 from IPython.utils.traitlets import (
60 from IPython.utils.traitlets import (
60 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
61 )
62 )
62 from IPython.utils.path import get_ipython_module_path, get_home_dir
63 from IPython.utils.path import get_home_dir
63 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
64 from IPython.utils.process import find_cmd, FindCmdError
64
65
65 from .win32support import forward_read_events
66 from .win32support import forward_read_events
66
67
@@ -72,18 +73,13 b" WINDOWS = os.name == 'nt'"
72 # Paths to the kernel apps
73 # Paths to the kernel apps
73 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
74
75
76 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
75
77
76 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
77 'IPython.parallel.apps.ipclusterapp'
78 ))
79
79
80 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
80 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
81 'IPython.parallel.apps.ipengineapp'
82 ))
83
81
84 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
82 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
85 'IPython.parallel.apps.ipcontrollerapp'
86 ))
87
83
88 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
89 # Base launchers and errors
85 # Base launchers and errors
@@ -36,6 +36,7 b' from IPython.core.application import BaseIPythonApplication'
36 from IPython.utils.jsonutil import rekey
36 from IPython.utils.jsonutil import rekey
37 from IPython.utils.localinterfaces import LOCAL_IPS
37 from IPython.utils.localinterfaces import LOCAL_IPS
38 from IPython.utils.path import get_ipython_dir
38 from IPython.utils.path import get_ipython_dir
39 from IPython.utils.py3compat import cast_bytes
39 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
40 Dict, List, Bool, Set, Any)
41 Dict, List, Bool, Set, Any)
41 from IPython.external.decorator import decorator
42 from IPython.external.decorator import decorator
@@ -71,6 +72,60 b' def spin_first(f, self, *args, **kwargs):'
71 # Classes
72 # Classes
72 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
73
74
75
76 class ExecuteReply(object):
77 """wrapper for finished Execute results"""
78 def __init__(self, msg_id, content, metadata):
79 self.msg_id = msg_id
80 self._content = content
81 self.execution_count = content['execution_count']
82 self.metadata = metadata
83
84 def __getitem__(self, key):
85 return self.metadata[key]
86
87 def __getattr__(self, key):
88 if key not in self.metadata:
89 raise AttributeError(key)
90 return self.metadata[key]
91
92 def __repr__(self):
93 pyout = self.metadata['pyout'] or {}
94 text_out = pyout.get('data', {}).get('text/plain', '')
95 if len(text_out) > 32:
96 text_out = text_out[:29] + '...'
97
98 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
99
100 def _repr_html_(self):
101 pyout = self.metadata['pyout'] or {'data':{}}
102 return pyout['data'].get("text/html")
103
104 def _repr_latex_(self):
105 pyout = self.metadata['pyout'] or {'data':{}}
106 return pyout['data'].get("text/latex")
107
108 def _repr_json_(self):
109 pyout = self.metadata['pyout'] or {'data':{}}
110 return pyout['data'].get("application/json")
111
112 def _repr_javascript_(self):
113 pyout = self.metadata['pyout'] or {'data':{}}
114 return pyout['data'].get("application/javascript")
115
116 def _repr_png_(self):
117 pyout = self.metadata['pyout'] or {'data':{}}
118 return pyout['data'].get("image/png")
119
120 def _repr_jpeg_(self):
121 pyout = self.metadata['pyout'] or {'data':{}}
122 return pyout['data'].get("image/jpeg")
123
124 def _repr_svg_(self):
125 pyout = self.metadata['pyout'] or {'data':{}}
126 return pyout['data'].get("image/svg+xml")
127
128
74 class Metadata(dict):
129 class Metadata(dict):
75 """Subclass of dict for initializing metadata values.
130 """Subclass of dict for initializing metadata values.
76
131
@@ -97,6 +152,7 b' class Metadata(dict):'
97 'pyerr' : None,
152 'pyerr' : None,
98 'stdout' : '',
153 'stdout' : '',
99 'stderr' : '',
154 'stderr' : '',
155 'outputs' : [],
100 }
156 }
101 self.update(md)
157 self.update(md)
102 self.update(dict(*args, **kwargs))
158 self.update(dict(*args, **kwargs))
@@ -308,15 +364,19 b' class Client(HasTraits):'
308 if self._cd is not None:
364 if self._cd is not None:
309 if url_or_file is None:
365 if url_or_file is None:
310 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
366 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
311 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
367 if url_or_file is None:
368 raise ValueError(
369 "I can't find enough information to connect to a hub!"
312 " Please specify at least one of url_or_file or profile."
370 " Please specify at least one of url_or_file or profile."
371 )
313
372
314 if not util.is_url(url_or_file):
373 if not util.is_url(url_or_file):
315 # it's not a url, try for a file
374 # it's not a url, try for a file
316 if not os.path.exists(url_or_file):
375 if not os.path.exists(url_or_file):
317 if self._cd:
376 if self._cd:
318 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
377 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
319 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
378 if not os.path.exists(url_or_file):
379 raise IOError("Connection file not found: %r" % url_or_file)
320 with open(url_or_file) as f:
380 with open(url_or_file) as f:
321 cfg = json.loads(f.read())
381 cfg = json.loads(f.read())
322 else:
382 else:
@@ -369,7 +429,7 b' class Client(HasTraits):'
369 if os.path.isfile(exec_key):
429 if os.path.isfile(exec_key):
370 extra_args['keyfile'] = exec_key
430 extra_args['keyfile'] = exec_key
371 else:
431 else:
372 exec_key = util.asbytes(exec_key)
432 exec_key = cast_bytes(exec_key)
373 extra_args['key'] = exec_key
433 extra_args['key'] = exec_key
374 self.session = Session(**extra_args)
434 self.session = Session(**extra_args)
375
435
@@ -467,7 +527,7 b' class Client(HasTraits):'
467 if not isinstance(targets, (tuple, list, xrange)):
527 if not isinstance(targets, (tuple, list, xrange)):
468 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
528 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
469
529
470 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
530 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
471
531
472 def _connect(self, sshserver, ssh_kwargs, timeout):
532 def _connect(self, sshserver, ssh_kwargs, timeout):
473 """setup all our socket connections to the cluster. This is called from
533 """setup all our socket connections to the cluster. This is called from
@@ -628,7 +688,30 b' class Client(HasTraits):'
628 print ("got unknown result: %s"%msg_id)
688 print ("got unknown result: %s"%msg_id)
629 else:
689 else:
630 self.outstanding.remove(msg_id)
690 self.outstanding.remove(msg_id)
631 self.results[msg_id] = self._unwrap_exception(msg['content'])
691
692 content = msg['content']
693 header = msg['header']
694
695 # construct metadata:
696 md = self.metadata[msg_id]
697 md.update(self._extract_metadata(header, parent, content))
698 # is this redundant?
699 self.metadata[msg_id] = md
700
701 e_outstanding = self._outstanding_dict[md['engine_uuid']]
702 if msg_id in e_outstanding:
703 e_outstanding.remove(msg_id)
704
705 # construct result:
706 if content['status'] == 'ok':
707 self.results[msg_id] = ExecuteReply(msg_id, content, md)
708 elif content['status'] == 'aborted':
709 self.results[msg_id] = error.TaskAborted(msg_id)
710 elif content['status'] == 'resubmitted':
711 # TODO: handle resubmission
712 pass
713 else:
714 self.results[msg_id] = self._unwrap_exception(content)
632
715
633 def _handle_apply_reply(self, msg):
716 def _handle_apply_reply(self, msg):
634 """Save the reply to an apply_request into our results."""
717 """Save the reply to an apply_request into our results."""
@@ -750,8 +833,13 b' class Client(HasTraits):'
750 md.update({'pyerr' : self._unwrap_exception(content)})
833 md.update({'pyerr' : self._unwrap_exception(content)})
751 elif msg_type == 'pyin':
834 elif msg_type == 'pyin':
752 md.update({'pyin' : content['code']})
835 md.update({'pyin' : content['code']})
836 elif msg_type == 'display_data':
837 md['outputs'].append(content)
838 elif msg_type == 'pyout':
839 md['pyout'] = content
753 else:
840 else:
754 md.update({msg_type : content.get('data', '')})
841 # unhandled msg_type (status, etc.)
842 pass
755
843
756 # reduntant?
844 # reduntant?
757 self.metadata[msg_id] = md
845 self.metadata[msg_id] = md
@@ -848,14 +936,14 b' class Client(HasTraits):'
848 """
936 """
849 if self._notification_socket:
937 if self._notification_socket:
850 self._flush_notifications()
938 self._flush_notifications()
939 if self._iopub_socket:
940 self._flush_iopub(self._iopub_socket)
851 if self._mux_socket:
941 if self._mux_socket:
852 self._flush_results(self._mux_socket)
942 self._flush_results(self._mux_socket)
853 if self._task_socket:
943 if self._task_socket:
854 self._flush_results(self._task_socket)
944 self._flush_results(self._task_socket)
855 if self._control_socket:
945 if self._control_socket:
856 self._flush_control(self._control_socket)
946 self._flush_control(self._control_socket)
857 if self._iopub_socket:
858 self._flush_iopub(self._iopub_socket)
859 if self._query_socket:
947 if self._query_socket:
860 self._flush_ignored_hub_replies()
948 self._flush_ignored_hub_replies()
861
949
@@ -1024,14 +1112,16 b' class Client(HasTraits):'
1024
1112
1025 return result
1113 return result
1026
1114
1027 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1115 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1028 ident=None):
1116 ident=None):
1029 """construct and send an apply message via a socket.
1117 """construct and send an apply message via a socket.
1030
1118
1031 This is the principal method with which all engine execution is performed by views.
1119 This is the principal method with which all engine execution is performed by views.
1032 """
1120 """
1033
1121
1034 assert not self._closed, "cannot use me anymore, I'm closed!"
1122 if self._closed:
1123 raise RuntimeError("Client cannot be used after its sockets have been closed")
1124
1035 # defaults:
1125 # defaults:
1036 args = args if args is not None else []
1126 args = args if args is not None else []
1037 kwargs = kwargs if kwargs is not None else {}
1127 kwargs = kwargs if kwargs is not None else {}
@@ -1066,6 +1156,43 b' class Client(HasTraits):'
1066
1156
1067 return msg
1157 return msg
1068
1158
1159 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1160 """construct and send an execute request via a socket.
1161
1162 """
1163
1164 if self._closed:
1165 raise RuntimeError("Client cannot be used after its sockets have been closed")
1166
1167 # defaults:
1168 subheader = subheader if subheader is not None else {}
1169
1170 # validate arguments
1171 if not isinstance(code, basestring):
1172 raise TypeError("code must be text, not %s" % type(code))
1173 if not isinstance(subheader, dict):
1174 raise TypeError("subheader must be dict, not %s" % type(subheader))
1175
1176 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1177
1178
1179 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1180 subheader=subheader)
1181
1182 msg_id = msg['header']['msg_id']
1183 self.outstanding.add(msg_id)
1184 if ident:
1185 # possibly routed to a specific engine
1186 if isinstance(ident, list):
1187 ident = ident[-1]
1188 if ident in self._engines.values():
1189 # save for later, in case of engine death
1190 self._outstanding_dict[ident].add(msg_id)
1191 self.history.append(msg_id)
1192 self.metadata[msg_id]['submitted'] = datetime.now()
1193
1194 return msg
1195
1069 #--------------------------------------------------------------------------
1196 #--------------------------------------------------------------------------
1070 # construct a View object
1197 # construct a View object
1071 #--------------------------------------------------------------------------
1198 #--------------------------------------------------------------------------
@@ -1221,12 +1348,6 b' class Client(HasTraits):'
1221 raise TypeError("indices must be str or int, not %r"%id)
1348 raise TypeError("indices must be str or int, not %r"%id)
1222 theids.append(id)
1349 theids.append(id)
1223
1350
1224 for msg_id in theids:
1225 self.outstanding.discard(msg_id)
1226 if msg_id in self.history:
1227 self.history.remove(msg_id)
1228 self.results.pop(msg_id, None)
1229 self.metadata.pop(msg_id, None)
1230 content = dict(msg_ids = theids)
1351 content = dict(msg_ids = theids)
1231
1352
1232 self.session.send(self._query_socket, 'resubmit_request', content)
1353 self.session.send(self._query_socket, 'resubmit_request', content)
@@ -1238,8 +1359,10 b' class Client(HasTraits):'
1238 content = msg['content']
1359 content = msg['content']
1239 if content['status'] != 'ok':
1360 if content['status'] != 'ok':
1240 raise self._unwrap_exception(content)
1361 raise self._unwrap_exception(content)
1362 mapping = content['resubmitted']
1363 new_ids = [ mapping[msg_id] for msg_id in theids ]
1241
1364
1242 ar = AsyncHubResult(self, msg_ids=theids)
1365 ar = AsyncHubResult(self, msg_ids=new_ids)
1243
1366
1244 if block:
1367 if block:
1245 ar.wait()
1368 ar.wait()
@@ -128,13 +128,20 b' class View(HasTraits):'
128
128
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
130
130
131
132 def __repr__(self):
131 def __repr__(self):
133 strtargets = str(self.targets)
132 strtargets = str(self.targets)
134 if len(strtargets) > 16:
133 if len(strtargets) > 16:
135 strtargets = strtargets[:12]+'...]'
134 strtargets = strtargets[:12]+'...]'
136 return "<%s %s>"%(self.__class__.__name__, strtargets)
135 return "<%s %s>"%(self.__class__.__name__, strtargets)
137
136
137 def __len__(self):
138 if isinstance(self.targets, list):
139 return len(self.targets)
140 elif isinstance(self.targets, int):
141 return 1
142 else:
143 return len(self.client)
144
138 def set_flags(self, **kwargs):
145 def set_flags(self, **kwargs):
139 """set my attribute flags by keyword.
146 """set my attribute flags by keyword.
140
147
@@ -195,7 +202,7 b' class View(HasTraits):'
195 @sync_results
202 @sync_results
196 @save_ids
203 @save_ids
197 def _really_apply(self, f, args, kwargs, block=None, **options):
204 def _really_apply(self, f, args, kwargs, block=None, **options):
198 """wrapper for client.send_apply_message"""
205 """wrapper for client.send_apply_request"""
199 raise NotImplementedError("Implement in subclasses")
206 raise NotImplementedError("Implement in subclasses")
200
207
201 def apply(self, f, *args, **kwargs):
208 def apply(self, f, *args, **kwargs):
@@ -533,7 +540,7 b' class DirectView(View):'
533 msg_ids = []
540 msg_ids = []
534 trackers = []
541 trackers = []
535 for ident in _idents:
542 for ident in _idents:
536 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
543 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
537 ident=ident)
544 ident=ident)
538 if track:
545 if track:
539 trackers.append(msg['tracker'])
546 trackers.append(msg['tracker'])
@@ -547,6 +554,7 b' class DirectView(View):'
547 pass
554 pass
548 return ar
555 return ar
549
556
557
550 @spin_after
558 @spin_after
551 def map(self, f, *sequences, **kwargs):
559 def map(self, f, *sequences, **kwargs):
552 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
560 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
@@ -590,7 +598,9 b' class DirectView(View):'
590 pf = ParallelFunction(self, f, block=block, **kwargs)
598 pf = ParallelFunction(self, f, block=block, **kwargs)
591 return pf.map(*sequences)
599 return pf.map(*sequences)
592
600
593 def execute(self, code, targets=None, block=None):
601 @sync_results
602 @save_ids
603 def execute(self, code, silent=True, targets=None, block=None):
594 """Executes `code` on `targets` in blocking or nonblocking manner.
604 """Executes `code` on `targets` in blocking or nonblocking manner.
595
605
596 ``execute`` is always `bound` (affects engine namespace)
606 ``execute`` is always `bound` (affects engine namespace)
@@ -604,7 +614,22 b' class DirectView(View):'
604 whether or not to wait until done to return
614 whether or not to wait until done to return
605 default: self.block
615 default: self.block
606 """
616 """
607 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
617 block = self.block if block is None else block
618 targets = self.targets if targets is None else targets
619
620 _idents = self.client._build_targets(targets)[0]
621 msg_ids = []
622 trackers = []
623 for ident in _idents:
624 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
625 msg_ids.append(msg['header']['msg_id'])
626 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
627 if block:
628 try:
629 ar.get()
630 except KeyboardInterrupt:
631 pass
632 return ar
608
633
609 def run(self, filename, targets=None, block=None):
634 def run(self, filename, targets=None, block=None):
610 """Execute contents of `filename` on my engine(s).
635 """Execute contents of `filename` on my engine(s).
@@ -996,7 +1021,7 b' class LoadBalancedView(View):'
996 follow = self._render_dependency(follow)
1021 follow = self._render_dependency(follow)
997 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
1022 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
998
1023
999 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
1024 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1000 subheader=subheader)
1025 subheader=subheader)
1001 tracker = None if track is False else msg['tracker']
1026 tracker = None if track is False else msg['tracker']
1002
1027
@@ -23,9 +23,10 b' from zmq.devices import ThreadDevice'
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.py3compat import str_to_bytes
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27
28
28 from IPython.parallel.util import asbytes, log_errors
29 from IPython.parallel.util import log_errors
29
30
30 class Heart(object):
31 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
32 """A basic heart object for responding to a HeartMonitor.
@@ -123,7 +124,7 b' class HeartMonitor(LoggingConfigurable):'
123 self.responses = set()
124 self.responses = set()
124 # print self.on_probation, self.hearts
125 # print self.on_probation, self.hearts
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 self.pingstream.send(asbytes(str(self.lifetime)))
127 self.pingstream.send(str_to_bytes(str(self.lifetime)))
127 # flush stream to force immediate socket send
128 # flush stream to force immediate socket send
128 self.pingstream.flush()
129 self.pingstream.flush()
129
130
@@ -151,8 +152,8 b' class HeartMonitor(LoggingConfigurable):'
151 @log_errors
152 @log_errors
152 def handle_pong(self, msg):
153 def handle_pong(self, msg):
153 "a heart just beat"
154 "a heart just beat"
154 current = asbytes(str(self.lifetime))
155 current = str_to_bytes(str(self.lifetime))
155 last = asbytes(str(self.last_ping))
156 last = str_to_bytes(str(self.last_ping))
156 if msg[1] == current:
157 if msg[1] == current:
157 delta = time.time()-self.tic
158 delta = time.time()-self.tic
158 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
159 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
@@ -28,6 +28,7 b' from zmq.eventloop.zmqstream import ZMQStream'
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.py3compat import cast_bytes
31 from IPython.utils.traitlets import (
32 from IPython.utils.traitlets import (
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 )
34 )
@@ -441,7 +442,7 b' class Hub(SessionFactory):'
441 for t in targets:
442 for t in targets:
442 # map raw identities to ids
443 # map raw identities to ids
443 if isinstance(t, (str,unicode)):
444 if isinstance(t, (str,unicode)):
444 t = self.by_ident.get(t, t)
445 t = self.by_ident.get(cast_bytes(t), t)
445 _targets.append(t)
446 _targets.append(t)
446 targets = _targets
447 targets = _targets
447 bad_targets = [ t for t in targets if t not in self.ids ]
448 bad_targets = [ t for t in targets if t not in self.ids ]
@@ -467,13 +468,13 b' class Hub(SessionFactory):'
467 except ValueError:
468 except ValueError:
468 idents=[]
469 idents=[]
469 if not idents:
470 if not idents:
470 self.log.error("Bad Monitor Message: %r", msg)
471 self.log.error("Monitor message without topic: %r", msg)
471 return
472 return
472 handler = self.monitor_handlers.get(switch, None)
473 handler = self.monitor_handlers.get(switch, None)
473 if handler is not None:
474 if handler is not None:
474 handler(idents, msg)
475 handler(idents, msg)
475 else:
476 else:
476 self.log.error("Invalid monitor topic: %r", switch)
477 self.log.error("Unrecognized monitor topic: %r", switch)
477
478
478
479
479 @util.log_errors
480 @util.log_errors
@@ -719,14 +720,17 b' class Hub(SessionFactory):'
719 self.unassigned.remove(msg_id)
720 self.unassigned.remove(msg_id)
720
721
721 header = msg['header']
722 header = msg['header']
722 engine_uuid = header.get('engine', None)
723 engine_uuid = header.get('engine', u'')
723 eid = self.by_ident.get(engine_uuid, None)
724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
725
726 status = header.get('status', None)
724
727
725 if msg_id in self.pending:
728 if msg_id in self.pending:
726 self.log.info("task::task %r finished on %s", msg_id, eid)
729 self.log.info("task::task %r finished on %s", msg_id, eid)
727 self.pending.remove(msg_id)
730 self.pending.remove(msg_id)
728 self.all_completed.add(msg_id)
731 self.all_completed.add(msg_id)
729 if eid is not None:
732 if eid is not None:
733 if status != 'aborted':
730 self.completed[eid].append(msg_id)
734 self.completed[eid].append(msg_id)
731 if msg_id in self.tasks[eid]:
735 if msg_id in self.tasks[eid]:
732 self.tasks[eid].remove(msg_id)
736 self.tasks[eid].remove(msg_id)
@@ -760,7 +764,7 b' class Hub(SessionFactory):'
760 # print (content)
764 # print (content)
761 msg_id = content['msg_id']
765 msg_id = content['msg_id']
762 engine_uuid = content['engine_id']
766 engine_uuid = content['engine_id']
763 eid = self.by_ident[util.asbytes(engine_uuid)]
767 eid = self.by_ident[cast_bytes(engine_uuid)]
764
768
765 self.log.info("task::task %r arrived on %r", msg_id, eid)
769 self.log.info("task::task %r arrived on %r", msg_id, eid)
766 if msg_id in self.unassigned:
770 if msg_id in self.unassigned:
@@ -796,7 +800,7 b' class Hub(SessionFactory):'
796
800
797 parent = msg['parent_header']
801 parent = msg['parent_header']
798 if not parent:
802 if not parent:
799 self.log.error("iopub::invalid IOPub message: %r", msg)
803 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
800 return
804 return
801 msg_id = parent['msg_id']
805 msg_id = parent['msg_id']
802 msg_type = msg['header']['msg_type']
806 msg_type = msg['header']['msg_type']
@@ -850,13 +854,13 b' class Hub(SessionFactory):'
850 """Register a new engine."""
854 """Register a new engine."""
851 content = msg['content']
855 content = msg['content']
852 try:
856 try:
853 queue = util.asbytes(content['queue'])
857 queue = cast_bytes(content['queue'])
854 except KeyError:
858 except KeyError:
855 self.log.error("registration::queue not specified", exc_info=True)
859 self.log.error("registration::queue not specified", exc_info=True)
856 return
860 return
857 heart = content.get('heartbeat', None)
861 heart = content.get('heartbeat', None)
858 if heart:
862 if heart:
859 heart = util.asbytes(heart)
863 heart = cast_bytes(heart)
860 """register a new engine, and create the socket(s) necessary"""
864 """register a new engine, and create the socket(s) necessary"""
861 eid = self._next_id
865 eid = self._next_id
862 # print (eid, queue, reg, heart)
866 # print (eid, queue, reg, heart)
@@ -1130,7 +1134,7 b' class Hub(SessionFactory):'
1130
1134
1131 # validate msg_ids
1135 # validate msg_ids
1132 found_ids = [ rec['msg_id'] for rec in records ]
1136 found_ids = [ rec['msg_id'] for rec in records ]
1133 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1137 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1134 if len(records) > len(msg_ids):
1138 if len(records) > len(msg_ids):
1135 try:
1139 try:
1136 raise RuntimeError("DB appears to be in an inconsistent state."
1140 raise RuntimeError("DB appears to be in an inconsistent state."
@@ -1143,40 +1147,46 b' class Hub(SessionFactory):'
1143 raise KeyError("No such msg(s): %r" % missing)
1147 raise KeyError("No such msg(s): %r" % missing)
1144 except KeyError:
1148 except KeyError:
1145 return finish(error.wrap_exception())
1149 return finish(error.wrap_exception())
1146 elif invalid_ids:
1150 elif pending_ids:
1147 msg_id = invalid_ids[0]
1151 pass
1148 try:
1152 # no need to raise on resubmit of pending task, now that we
1149 raise ValueError("Task %r appears to be inflight" % msg_id)
1153 # resubmit under new ID, but do we want to raise anyway?
1150 except Exception:
1154 # msg_id = invalid_ids[0]
1151 return finish(error.wrap_exception())
1155 # try:
1156 # raise ValueError("Task(s) %r appears to be inflight" % )
1157 # except Exception:
1158 # return finish(error.wrap_exception())
1159
1160 # mapping of original IDs to resubmitted IDs
1161 resubmitted = {}
1152
1162
1153 # clear the existing records
1154 now = datetime.now()
1155 rec = empty_record()
1156 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1157 rec['resubmitted'] = now
1158 rec['queue'] = 'task'
1159 rec['client_uuid'] = client_id[0]
1160 try:
1161 for msg_id in msg_ids:
1162 self.all_completed.discard(msg_id)
1163 self.db.update_record(msg_id, rec)
1164 except Exception:
1165 self.log.error('db::db error upating record', exc_info=True)
1166 reply = error.wrap_exception()
1167 else:
1168 # send the messages
1163 # send the messages
1169 for rec in records:
1164 for rec in records:
1170 header = rec['header']
1165 header = rec['header']
1171 # include resubmitted in header to prevent digest collision
1172 header['resubmitted'] = now
1173 msg = self.session.msg(header['msg_type'])
1166 msg = self.session.msg(header['msg_type'])
1167 msg_id = msg['msg_id']
1174 msg['content'] = rec['content']
1168 msg['content'] = rec['content']
1169 header.update(msg['header'])
1175 msg['header'] = header
1170 msg['header'] = header
1176 msg['header']['msg_id'] = rec['msg_id']
1171
1177 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1172 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1178
1173
1179 finish(dict(status='ok'))
1174 resubmitted[rec['msg_id']] = msg_id
1175 self.pending.add(msg_id)
1176 msg['buffers'] = []
1177 try:
1178 self.db.add_record(msg_id, init_record(msg))
1179 except Exception:
1180 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1181
1182 finish(dict(status='ok', resubmitted=resubmitted))
1183
1184 # store the new IDs in the Task DB
1185 for msg_id, resubmit_id in resubmitted.iteritems():
1186 try:
1187 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1188 except Exception:
1189 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1180
1190
1181
1191
1182 def _extract_record(self, rec):
1192 def _extract_record(self, rec):
@@ -42,10 +42,11 b' from IPython.external.decorator import decorator'
42 from IPython.config.application import Application
42 from IPython.config.application import Application
43 from IPython.config.loader import Config
43 from IPython.config.loader import Config
44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
45 from IPython.utils.py3compat import cast_bytes
45
46
46 from IPython.parallel import error, util
47 from IPython.parallel import error, util
47 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.util import connect_logger, local_logger, asbytes
49 from IPython.parallel.util import connect_logger, local_logger
49
50
50 from .dependency import Dependency
51 from .dependency import Dependency
51
52
@@ -262,7 +263,7 b' class TaskScheduler(SessionFactory):'
262 self.log.error("Unhandled message type: %r"%msg_type)
263 self.log.error("Unhandled message type: %r"%msg_type)
263 else:
264 else:
264 try:
265 try:
265 handler(asbytes(msg['content']['queue']))
266 handler(cast_bytes(msg['content']['queue']))
266 except Exception:
267 except Exception:
267 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
268 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
268
269
@@ -316,7 +317,7 b' class TaskScheduler(SessionFactory):'
316 # prevent double-handling of messages
317 # prevent double-handling of messages
317 continue
318 continue
318
319
319 raw_msg = lost[msg_id][0]
320 raw_msg = lost[msg_id].raw_msg
320 idents,msg = self.session.feed_identities(raw_msg, copy=False)
321 idents,msg = self.session.feed_identities(raw_msg, copy=False)
321 parent = self.session.unpack(msg[1].bytes)
322 parent = self.session.unpack(msg[1].bytes)
322 idents = [engine, idents[0]]
323 idents = [engine, idents[0]]
@@ -370,7 +371,7 b' class TaskScheduler(SessionFactory):'
370 # get targets as a set of bytes objects
371 # get targets as a set of bytes objects
371 # from a list of unicode objects
372 # from a list of unicode objects
372 targets = header.get('targets', [])
373 targets = header.get('targets', [])
373 targets = map(asbytes, targets)
374 targets = map(cast_bytes, targets)
374 targets = set(targets)
375 targets = set(targets)
375
376
376 retries = header.get('retries', 0)
377 retries = header.get('retries', 0)
@@ -138,7 +138,7 b' class SQLiteDB(BaseDB):'
138 'engine_uuid' : 'text',
138 'engine_uuid' : 'text',
139 'started' : 'timestamp',
139 'started' : 'timestamp',
140 'completed' : 'timestamp',
140 'completed' : 'timestamp',
141 'resubmitted' : 'timestamp',
141 'resubmitted' : 'text',
142 'received' : 'timestamp',
142 'received' : 'timestamp',
143 'result_header' : 'dict text',
143 'result_header' : 'dict text',
144 'result_content' : 'dict text',
144 'result_content' : 'dict text',
@@ -226,15 +226,16 b' class SQLiteDB(BaseDB):'
226 # isolation_level = None)#,
226 # isolation_level = None)#,
227 cached_statements=64)
227 cached_statements=64)
228 # print dir(self._db)
228 # print dir(self._db)
229 first_table = self.table
229 first_table = previous_table = self.table
230 i=0
230 i=0
231 while not self._check_table():
231 while not self._check_table():
232 i+=1
232 i+=1
233 self.table = first_table+'_%i'%i
233 self.table = first_table+'_%i'%i
234 self.log.warn(
234 self.log.warn(
235 "Table %s exists and doesn't match db format, trying %s"%
235 "Table %s exists and doesn't match db format, trying %s"%
236 (first_table,self.table)
236 (previous_table, self.table)
237 )
237 )
238 previous_table = self.table
238
239
239 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
240 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
240 (msg_id text PRIMARY KEY,
241 (msg_id text PRIMARY KEY,
@@ -246,7 +247,7 b' class SQLiteDB(BaseDB):'
246 engine_uuid text,
247 engine_uuid text,
247 started timestamp,
248 started timestamp,
248 completed timestamp,
249 completed timestamp,
249 resubmitted timestamp,
250 resubmitted text,
250 received timestamp,
251 received timestamp,
251 result_header dict text,
252 result_header dict text,
252 result_content dict text,
253 result_content dict text,
@@ -27,15 +27,14 b' from IPython.external.ssh import tunnel'
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
29 )
29 )
30 from IPython.utils import py3compat
30 from IPython.utils.py3compat import cast_bytes
31
31
32 from IPython.parallel.controller.heartmonitor import Heart
32 from IPython.parallel.controller.heartmonitor import Heart
33 from IPython.parallel.factory import RegistrationFactory
33 from IPython.parallel.factory import RegistrationFactory
34 from IPython.parallel.util import disambiguate_url, asbytes
34 from IPython.parallel.util import disambiguate_url
35
35
36 from IPython.zmq.session import Message
36 from IPython.zmq.session import Message
37
37 from IPython.zmq.ipkernel import Kernel
38 from .streamkernel import Kernel
39
38
40 class EngineFactory(RegistrationFactory):
39 class EngineFactory(RegistrationFactory):
41 """IPython engine"""
40 """IPython engine"""
@@ -70,7 +69,7 b' class EngineFactory(RegistrationFactory):'
70 bident = CBytes()
69 bident = CBytes()
71 ident = Unicode()
70 ident = Unicode()
72 def _ident_changed(self, name, old, new):
71 def _ident_changed(self, name, old, new):
73 self.bident = asbytes(new)
72 self.bident = cast_bytes(new)
74 using_ssh=Bool(False)
73 using_ssh=Bool(False)
75
74
76
75
@@ -185,23 +184,27 b' class EngineFactory(RegistrationFactory):'
185
184
186 # create iopub stream:
185 # create iopub stream:
187 iopub_addr = msg.content.iopub
186 iopub_addr = msg.content.iopub
188 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
187 iopub_socket = ctx.socket(zmq.PUB)
189 iopub_stream.setsockopt(zmq.IDENTITY, identity)
188 iopub_socket.setsockopt(zmq.IDENTITY, identity)
190 connect(iopub_stream, iopub_addr)
189 connect(iopub_socket, iopub_addr)
190
191 # disable history:
192 self.config.HistoryManager.hist_file = ':memory:'
191
193
192 # # Redirect input streams and set a display hook.
194 # Redirect input streams and set a display hook.
193 if self.out_stream_factory:
195 if self.out_stream_factory:
194 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
196 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
195 sys.stdout.topic = py3compat.cast_bytes('engine.%i.stdout' % self.id)
197 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
196 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
198 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
197 sys.stderr.topic = py3compat.cast_bytes('engine.%i.stderr' % self.id)
199 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
198 if self.display_hook_factory:
200 if self.display_hook_factory:
199 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
201 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
200 sys.displayhook.topic = py3compat.cast_bytes('engine.%i.pyout' % self.id)
202 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
201
203
202 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
203 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
205 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
204 loop=loop, user_ns = self.user_ns, log=self.log)
206 loop=loop, user_ns=self.user_ns, log=self.log)
207 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
205 self.kernel.start()
208 self.kernel.start()
206
209
207
210
@@ -65,7 +65,7 b' def setup():'
65 if cp.poll() is not None:
65 if cp.poll() is not None:
66 print cp.poll()
66 print cp.poll()
67 raise RuntimeError("The test controller failed to start.")
67 raise RuntimeError("The test controller failed to start.")
68 elif time.time()-tic > 10:
68 elif time.time()-tic > 15:
69 raise RuntimeError("Timeout waiting for the test controller to start.")
69 raise RuntimeError("Timeout waiting for the test controller to start.")
70 time.sleep(0.1)
70 time.sleep(0.1)
71 add_engines(1)
71 add_engines(1)
@@ -93,7 +93,7 b" def add_engines(n=1, profile='iptest', total=False):"
93 while len(rc) < base+n:
93 while len(rc) < base+n:
94 if any([ ep.poll() is not None for ep in eps ]):
94 if any([ ep.poll() is not None for ep in eps ]):
95 raise RuntimeError("A test engine failed to start.")
95 raise RuntimeError("A test engine failed to start.")
96 elif time.time()-tic > 10:
96 elif time.time()-tic > 15:
97 raise RuntimeError("Timeout waiting for engines to connect.")
97 raise RuntimeError("Timeout waiting for engines to connect.")
98 time.sleep(.1)
98 time.sleep(.1)
99 rc.spin()
99 rc.spin()
@@ -154,7 +154,7 b' class AsyncResultTest(ClusterTestCase):'
154 ar = v.apply_async(time.sleep, 0.25)
154 ar = v.apply_async(time.sleep, 0.25)
155 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
155 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
156 ar.get(2)
156 ar.get(2)
157 self.assertTrue(ar.serial_time < 0.5)
157 self.assertTrue(ar.serial_time < 1.)
158 self.assertTrue(ar.serial_time > 0.2)
158 self.assertTrue(ar.serial_time > 0.2)
159
159
160 def test_serial_time_multi(self):
160 def test_serial_time_multi(self):
@@ -171,8 +171,8 b' class AsyncResultTest(ClusterTestCase):'
171 ar = v.apply_async(time.sleep, 0.25)
171 ar = v.apply_async(time.sleep, 0.25)
172 while not ar.ready():
172 while not ar.ready():
173 time.sleep(0.01)
173 time.sleep(0.01)
174 self.assertTrue(ar.elapsed < 0.3)
174 self.assertTrue(ar.elapsed < 1)
175 self.assertTrue(ar.elapsed < 0.3)
175 self.assertTrue(ar.elapsed < 1)
176 ar.get(2)
176 ar.get(2)
177
177
178 def test_elapsed_multi(self):
178 def test_elapsed_multi(self):
@@ -180,8 +180,8 b' class AsyncResultTest(ClusterTestCase):'
180 ar = v.apply_async(time.sleep, 0.25)
180 ar = v.apply_async(time.sleep, 0.25)
181 while not ar.ready():
181 while not ar.ready():
182 time.sleep(0.01)
182 time.sleep(0.01)
183 self.assertTrue(ar.elapsed < 0.3)
183 self.assertTrue(ar.elapsed < 1)
184 self.assertTrue(ar.elapsed < 0.3)
184 self.assertTrue(ar.elapsed < 1)
185 ar.get(2)
185 ar.get(2)
186
186
187 def test_hubresult_timestamps(self):
187 def test_hubresult_timestamps(self):
@@ -280,6 +280,25 b' class TestClient(ClusterTestCase):'
280 time.sleep(0.25)
280 time.sleep(0.25)
281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282
282
283 def _wait_for_idle(self):
284 """wait for an engine to become idle, according to the Hub"""
285 rc = self.client
286
287 # timeout 2s, polling every 100ms
288 for i in range(20):
289 qs = rc.queue_status()
290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 time.sleep(0.1)
292 else:
293 break
294
295 # ensure Hub up to date:
296 qs = rc.queue_status()
297 self.assertEquals(qs['unassigned'], 0)
298 for eid in rc.ids:
299 self.assertEquals(qs[eid]['tasks'], 0)
300
301
283 def test_resubmit(self):
302 def test_resubmit(self):
284 def f():
303 def f():
285 import random
304 import random
@@ -288,19 +307,38 b' class TestClient(ClusterTestCase):'
288 ar = v.apply_async(f)
307 ar = v.apply_async(f)
289 r1 = ar.get(1)
308 r1 = ar.get(1)
290 # give the Hub a chance to notice:
309 # give the Hub a chance to notice:
291 time.sleep(0.5)
310 self._wait_for_idle()
292 ahr = self.client.resubmit(ar.msg_ids)
311 ahr = self.client.resubmit(ar.msg_ids)
293 r2 = ahr.get(1)
312 r2 = ahr.get(1)
294 self.assertFalse(r1 == r2)
313 self.assertFalse(r1 == r2)
295
314
315 def test_resubmit_aborted(self):
316 def f():
317 import random
318 return random.random()
319 v = self.client.load_balanced_view()
320 # restrict to one engine, so we can put a sleep
321 # ahead of the task, so it will get aborted
322 eid = self.client.ids[-1]
323 v.targets = [eid]
324 sleep = v.apply_async(time.sleep, 0.5)
325 ar = v.apply_async(f)
326 ar.abort()
327 self.assertRaises(error.TaskAborted, ar.get)
328 # Give the Hub a chance to get up to date:
329 self._wait_for_idle()
330 ahr = self.client.resubmit(ar.msg_ids)
331 r2 = ahr.get(1)
332
296 def test_resubmit_inflight(self):
333 def test_resubmit_inflight(self):
297 """ensure ValueError on resubmit of inflight task"""
334 """resubmit of inflight task"""
298 v = self.client.load_balanced_view()
335 v = self.client.load_balanced_view()
299 ar = v.apply_async(time.sleep,1)
336 ar = v.apply_async(time.sleep,1)
300 # give the message a chance to arrive
337 # give the message a chance to arrive
301 time.sleep(0.2)
338 time.sleep(0.2)
302 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
339 ahr = self.client.resubmit(ar.msg_ids)
303 ar.get(2)
340 ar.get(2)
341 ahr.get(2)
304
342
305 def test_resubmit_badkey(self):
343 def test_resubmit_badkey(self):
306 """ensure KeyError on resubmit of nonexistant task"""
344 """ensure KeyError on resubmit of nonexistant task"""
@@ -18,6 +18,7 b' Authors:'
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import logging
21 import os
22 import os
22 import tempfile
23 import tempfile
23 import time
24 import time
@@ -226,7 +227,9 b' class TestSQLiteBackend(TestDictBackend):'
226 @dec.skip_without('sqlite3')
227 @dec.skip_without('sqlite3')
227 def create_db(self):
228 def create_db(self):
228 location, fname = os.path.split(temp_db)
229 location, fname = os.path.split(temp_db)
229 return SQLiteDB(location=location, fname=fname)
230 log = logging.getLogger('test')
231 log.setLevel(logging.CRITICAL)
232 return SQLiteDB(location=location, fname=fname, log=log)
230
233
231 def tearDown(self):
234 def tearDown(self):
232 self.db._db.close()
235 self.db._db.close()
@@ -25,6 +25,7 b' import zmq'
25 from nose import SkipTest
25 from nose import SkipTest
26
26
27 from IPython.testing import decorators as dec
27 from IPython.testing import decorators as dec
28 from IPython.testing.ipunittest import ParametricTestCase
28
29
29 from IPython import parallel as pmod
30 from IPython import parallel as pmod
30 from IPython.parallel import error
31 from IPython.parallel import error
@@ -39,7 +40,7 b' from .clienttest import ClusterTestCase, crash, wait, skip_without'
39 def setup():
40 def setup():
40 add_engines(3, total=True)
41 add_engines(3, total=True)
41
42
42 class TestView(ClusterTestCase):
43 class TestView(ClusterTestCase, ParametricTestCase):
43
44
44 def test_z_crash_mux(self):
45 def test_z_crash_mux(self):
45 """test graceful handling of engine death (direct)"""
46 """test graceful handling of engine death (direct)"""
@@ -421,16 +422,16 b' class TestView(ClusterTestCase):'
421 sys.stdout = sio
422 sys.stdout = sio
422 ip.magic_autopx()
423 ip.magic_autopx()
423 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
424 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
424 ip.run_cell('print b')
425 ip.run_cell('b*=2')
426 ip.run_cell('print (b)')
425 ip.run_cell("b/c")
427 ip.run_cell("b/c")
426 ip.run_code(compile('b*=2', '', 'single'))
427 ip.magic_autopx()
428 ip.magic_autopx()
428 sys.stdout = savestdout
429 sys.stdout = savestdout
429 output = sio.getvalue().strip()
430 output = sio.getvalue().strip()
430 self.assertTrue(output.startswith('%autopx enabled'))
431 self.assertTrue(output.startswith('%autopx enabled'))
431 self.assertTrue(output.endswith('%autopx disabled'))
432 self.assertTrue(output.endswith('%autopx disabled'))
432 self.assertTrue('RemoteError: ZeroDivisionError' in output)
433 self.assertTrue('RemoteError: ZeroDivisionError' in output)
433 ar = v.get_result(-2)
434 ar = v.get_result(-1)
434 self.assertEquals(v['a'], 5)
435 self.assertEquals(v['a'], 5)
435 self.assertEquals(v['b'], 20)
436 self.assertEquals(v['b'], 20)
436 self.assertRaisesRemote(ZeroDivisionError, ar.get)
437 self.assertRaisesRemote(ZeroDivisionError, ar.get)
@@ -446,9 +447,10 b' class TestView(ClusterTestCase):'
446 sys.stdout = sio
447 sys.stdout = sio
447 ip.magic_autopx()
448 ip.magic_autopx()
448 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
449 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
449 ip.run_cell('print b')
450 ip.run_cell('print (b)')
451 ip.run_cell('import time; time.sleep(0.1)')
450 ip.run_cell("b/c")
452 ip.run_cell("b/c")
451 ip.run_code(compile('b*=2', '', 'single'))
453 ip.run_cell('b*=2')
452 ip.magic_autopx()
454 ip.magic_autopx()
453 sys.stdout = savestdout
455 sys.stdout = savestdout
454 output = sio.getvalue().strip()
456 output = sio.getvalue().strip()
@@ -456,9 +458,12 b' class TestView(ClusterTestCase):'
456 self.assertTrue(output.endswith('%autopx disabled'))
458 self.assertTrue(output.endswith('%autopx disabled'))
457 self.assertFalse('ZeroDivisionError' in output)
459 self.assertFalse('ZeroDivisionError' in output)
458 ar = v.get_result(-2)
460 ar = v.get_result(-2)
459 self.assertEquals(v['a'], 5)
460 self.assertEquals(v['b'], 20)
461 self.assertRaisesRemote(ZeroDivisionError, ar.get)
461 self.assertRaisesRemote(ZeroDivisionError, ar.get)
462 # prevent TaskAborted on pulls, due to ZeroDivisionError
463 time.sleep(0.5)
464 self.assertEquals(v['a'], 5)
465 # b*=2 will not fire, due to abort
466 self.assertEquals(v['b'], 10)
462
467
463 def test_magic_result(self):
468 def test_magic_result(self):
464 ip = get_ipython()
469 ip = get_ipython()
@@ -551,3 +556,139 b' class TestView(ClusterTestCase):'
551 result = e0.map_sync(lambda x: -1*x, r)
556 result = e0.map_sync(lambda x: -1*x, r)
552 self.assertEquals(result, check)
557 self.assertEquals(result, check)
553
558
559 def test_len(self):
560 """len(view) makes sense"""
561 e0 = self.client[self.client.ids[0]]
562 yield self.assertEquals(len(e0), 1)
563 v = self.client[:]
564 yield self.assertEquals(len(v), len(self.client.ids))
565 v = self.client.direct_view('all')
566 yield self.assertEquals(len(v), len(self.client.ids))
567 v = self.client[:2]
568 yield self.assertEquals(len(v), 2)
569 v = self.client[:1]
570 yield self.assertEquals(len(v), 1)
571 v = self.client.load_balanced_view()
572 yield self.assertEquals(len(v), len(self.client.ids))
573 # parametric tests seem to require manual closing?
574 self.client.close()
575
576
577 # begin execute tests
578 def _wait_for(self, f, timeout=10):
579 tic = time.time()
580 while time.time() <= tic + timeout:
581 if f():
582 return
583 time.sleep(0.1)
584 self.client.spin()
585 if not f():
586 print "Warning: Awaited condition never arrived"
587
588
589 def test_execute_reply(self):
590 e0 = self.client[self.client.ids[0]]
591 e0.block = True
592 ar = e0.execute("5", silent=False)
593 er = ar.get()
594 self._wait_for(lambda : bool(er.pyout))
595 self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
596 self.assertEquals(er.pyout['data']['text/plain'], '5')
597
598 def test_execute_reply_stdout(self):
599 e0 = self.client[self.client.ids[0]]
600 e0.block = True
601 ar = e0.execute("print (5)", silent=False)
602 er = ar.get()
603 self._wait_for(lambda : bool(er.stdout))
604 self.assertEquals(er.stdout.strip(), '5')
605
606 def test_execute_pyout(self):
607 """execute triggers pyout with silent=False"""
608 view = self.client[:]
609 ar = view.execute("5", silent=False, block=True)
610 self._wait_for(lambda : all(ar.pyout))
611
612 expected = [{'text/plain' : '5'}] * len(view)
613 mimes = [ out['data'] for out in ar.pyout ]
614 self.assertEquals(mimes, expected)
615
616 def test_execute_silent(self):
617 """execute does not trigger pyout with silent=True"""
618 view = self.client[:]
619 ar = view.execute("5", block=True)
620 expected = [None] * len(view)
621 self.assertEquals(ar.pyout, expected)
622
623 def test_execute_magic(self):
624 """execute accepts IPython commands"""
625 view = self.client[:]
626 view.execute("a = 5")
627 ar = view.execute("%whos", block=True)
628 # this will raise, if that failed
629 ar.get(5)
630 self._wait_for(lambda : all(ar.stdout))
631 for stdout in ar.stdout:
632 lines = stdout.splitlines()
633 self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
634 found = False
635 for line in lines[2:]:
636 split = line.split()
637 if split == ['a', 'int', '5']:
638 found = True
639 break
640 self.assertTrue(found, "whos output wrong: %s" % stdout)
641
642 def test_execute_displaypub(self):
643 """execute tracks display_pub output"""
644 view = self.client[:]
645 view.execute("from IPython.core.display import *")
646 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
647
648 self._wait_for(lambda : all(len(er.outputs) >= 5 for er in ar))
649 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
650 for outputs in ar.outputs:
651 mimes = [ out['data'] for out in outputs ]
652 self.assertEquals(mimes, expected)
653
654 def test_apply_displaypub(self):
655 """apply tracks display_pub output"""
656 view = self.client[:]
657 view.execute("from IPython.core.display import *")
658
659 @interactive
660 def publish():
661 [ display(i) for i in range(5) ]
662
663 ar = view.apply_async(publish)
664 ar.get(5)
665 self._wait_for(lambda : all(len(out) >= 5 for out in ar.outputs))
666 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
667 for outputs in ar.outputs:
668 mimes = [ out['data'] for out in outputs ]
669 self.assertEquals(mimes, expected)
670
671 def test_execute_raises(self):
672 """exceptions in execute requests raise appropriately"""
673 view = self.client[-1]
674 ar = view.execute("1/0")
675 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
676
677 @dec.skipif_not_matplotlib
678 def test_magic_pylab(self):
679 """%pylab works on engines"""
680 view = self.client[-1]
681 ar = view.execute("%pylab inline")
682 # at least check if this raised:
683 reply = ar.get(5)
684 # include imports, in case user config
685 ar = view.execute("plot(rand(100))", silent=False)
686 reply = ar.get(5)
687 self._wait_for(lambda : all(ar.outputs))
688 self.assertEquals(len(reply.outputs), 1)
689 output = reply.outputs[0]
690 self.assertTrue("data" in output)
691 data = output['data']
692 self.assertTrue("image/png" in data)
693
694
@@ -47,6 +47,9 b' from IPython.utils import py3compat'
47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
48 from IPython.utils.newserialized import serialize, unserialize
48 from IPython.utils.newserialized import serialize, unserialize
49 from IPython.zmq.log import EnginePUBHandler
49 from IPython.zmq.log import EnginePUBHandler
50 from IPython.zmq.serialize import (
51 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
52 )
50
53
51 if py3compat.PY3:
54 if py3compat.PY3:
52 buffer = memoryview
55 buffer = memoryview
@@ -121,12 +124,6 b' def log_errors(f, self, *args, **kwargs):'
121 self.log.error("Uncaught exception in %r" % f, exc_info=True)
124 self.log.error("Uncaught exception in %r" % f, exc_info=True)
122
125
123
126
124 def asbytes(s):
125 """ensure that an object is ascii bytes"""
126 if isinstance(s, unicode):
127 s = s.encode('ascii')
128 return s
129
130 def is_url(url):
127 def is_url(url):
131 """boolean check for whether a string is a zmq url"""
128 """boolean check for whether a string is a zmq url"""
132 if '://' not in url:
129 if '://' not in url:
@@ -222,140 +219,6 b' def disambiguate_url(url, location=None):'
222
219
223 return "%s://%s:%s"%(proto,ip,port)
220 return "%s://%s:%s"%(proto,ip,port)
224
221
225 def serialize_object(obj, threshold=64e-6):
226 """Serialize an object into a list of sendable buffers.
227
228 Parameters
229 ----------
230
231 obj : object
232 The object to be serialized
233 threshold : float
234 The threshold for not double-pickling the content.
235
236
237 Returns
238 -------
239 ('pmd', [bufs]) :
240 where pmd is the pickled metadata wrapper,
241 bufs is a list of data buffers
242 """
243 databuffers = []
244 if isinstance(obj, (list, tuple)):
245 clist = canSequence(obj)
246 slist = map(serialize, clist)
247 for s in slist:
248 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
249 databuffers.append(s.getData())
250 s.data = None
251 return pickle.dumps(slist,-1), databuffers
252 elif isinstance(obj, dict):
253 sobj = {}
254 for k in sorted(obj.iterkeys()):
255 s = serialize(can(obj[k]))
256 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
257 databuffers.append(s.getData())
258 s.data = None
259 sobj[k] = s
260 return pickle.dumps(sobj,-1),databuffers
261 else:
262 s = serialize(can(obj))
263 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
264 databuffers.append(s.getData())
265 s.data = None
266 return pickle.dumps(s,-1),databuffers
267
268
269 def unserialize_object(bufs):
270 """reconstruct an object serialized by serialize_object from data buffers."""
271 bufs = list(bufs)
272 sobj = pickle.loads(bufs.pop(0))
273 if isinstance(sobj, (list, tuple)):
274 for s in sobj:
275 if s.data is None:
276 s.data = bufs.pop(0)
277 return uncanSequence(map(unserialize, sobj)), bufs
278 elif isinstance(sobj, dict):
279 newobj = {}
280 for k in sorted(sobj.iterkeys()):
281 s = sobj[k]
282 if s.data is None:
283 s.data = bufs.pop(0)
284 newobj[k] = uncan(unserialize(s))
285 return newobj, bufs
286 else:
287 if sobj.data is None:
288 sobj.data = bufs.pop(0)
289 return uncan(unserialize(sobj)), bufs
290
291 def pack_apply_message(f, args, kwargs, threshold=64e-6):
292 """pack up a function, args, and kwargs to be sent over the wire
293 as a series of buffers. Any object whose data is larger than `threshold`
294 will not have their data copied (currently only numpy arrays support zero-copy)"""
295 msg = [pickle.dumps(can(f),-1)]
296 databuffers = [] # for large objects
297 sargs, bufs = serialize_object(args,threshold)
298 msg.append(sargs)
299 databuffers.extend(bufs)
300 skwargs, bufs = serialize_object(kwargs,threshold)
301 msg.append(skwargs)
302 databuffers.extend(bufs)
303 msg.extend(databuffers)
304 return msg
305
306 def unpack_apply_message(bufs, g=None, copy=True):
307 """unpack f,args,kwargs from buffers packed by pack_apply_message()
308 Returns: original f,args,kwargs"""
309 bufs = list(bufs) # allow us to pop
310 assert len(bufs) >= 3, "not enough buffers!"
311 if not copy:
312 for i in range(3):
313 bufs[i] = bufs[i].bytes
314 cf = pickle.loads(bufs.pop(0))
315 sargs = list(pickle.loads(bufs.pop(0)))
316 skwargs = dict(pickle.loads(bufs.pop(0)))
317 # print sargs, skwargs
318 f = uncan(cf, g)
319 for sa in sargs:
320 if sa.data is None:
321 m = bufs.pop(0)
322 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
323 # always use a buffer, until memoryviews get sorted out
324 sa.data = buffer(m)
325 # disable memoryview support
326 # if copy:
327 # sa.data = buffer(m)
328 # else:
329 # sa.data = m.buffer
330 else:
331 if copy:
332 sa.data = m
333 else:
334 sa.data = m.bytes
335
336 args = uncanSequence(map(unserialize, sargs), g)
337 kwargs = {}
338 for k in sorted(skwargs.iterkeys()):
339 sa = skwargs[k]
340 if sa.data is None:
341 m = bufs.pop(0)
342 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
343 # always use a buffer, until memoryviews get sorted out
344 sa.data = buffer(m)
345 # disable memoryview support
346 # if copy:
347 # sa.data = buffer(m)
348 # else:
349 # sa.data = m.buffer
350 else:
351 if copy:
352 sa.data = m
353 else:
354 sa.data = m.bytes
355
356 kwargs[k] = uncan(unserialize(sa), g)
357
358 return f,args,kwargs
359
222
360 #--------------------------------------------------------------------------
223 #--------------------------------------------------------------------------
361 # helpers for implementing old MEC API via view.apply
224 # helpers for implementing old MEC API via view.apply
@@ -43,6 +43,7 b' class ZMQShellDisplayHook(DisplayHook):'
43 """A displayhook subclass that publishes data using ZeroMQ. This is intended
43 """A displayhook subclass that publishes data using ZeroMQ. This is intended
44 to work with an InteractiveShell instance. It sends a dict of different
44 to work with an InteractiveShell instance. It sends a dict of different
45 representations of the object."""
45 representations of the object."""
46 topic=None
46
47
47 session = Instance(Session)
48 session = Instance(Session)
48 pub_socket = Instance('zmq.Socket')
49 pub_socket = Instance('zmq.Socket')
@@ -67,6 +68,6 b' class ZMQShellDisplayHook(DisplayHook):'
67 """Finish up all displayhook activities."""
68 """Finish up all displayhook activities."""
68 sys.stdout.flush()
69 sys.stdout.flush()
69 sys.stderr.flush()
70 sys.stderr.flush()
70 self.session.send(self.pub_socket, self.msg)
71 self.session.send(self.pub_socket, self.msg, ident=self.topic)
71 self.msg = None
72 self.msg = None
72
73
@@ -20,8 +20,10 b' import sys'
20 import zmq
20 import zmq
21
21
22 # Local imports.
22 # Local imports.
23 from IPython.config.application import Application
23 from IPython.utils import io
24 from IPython.utils import io
24
25
26
25 #------------------------------------------------------------------------------
27 #------------------------------------------------------------------------------
26 # Eventloops for integrating the Kernel into different GUIs
28 # Eventloops for integrating the Kernel into different GUIs
27 #------------------------------------------------------------------------------
29 #------------------------------------------------------------------------------
@@ -164,7 +166,10 b' def loop_cocoa(kernel):'
164 # but still need a Poller for when there are no active windows,
166 # but still need a Poller for when there are no active windows,
165 # during which time mainloop() returns immediately
167 # during which time mainloop() returns immediately
166 poller = zmq.Poller()
168 poller = zmq.Poller()
167 poller.register(kernel.shell_socket, zmq.POLLIN)
169 if kernel.control_stream:
170 poller.register(kernel.control_stream.socket, zmq.POLLIN)
171 for stream in kernel.shell_streams:
172 poller.register(stream.socket, zmq.POLLIN)
168
173
169 while True:
174 while True:
170 try:
175 try:
@@ -203,11 +208,15 b' loop_map = {'
203
208
204 def enable_gui(gui, kernel=None):
209 def enable_gui(gui, kernel=None):
205 """Enable integration with a given GUI"""
210 """Enable integration with a given GUI"""
206 if kernel is None:
207 from .ipkernel import IPKernelApp
208 kernel = IPKernelApp.instance().kernel
209 if gui not in loop_map:
211 if gui not in loop_map:
210 raise ValueError("GUI %r not supported" % gui)
212 raise ValueError("GUI %r not supported" % gui)
213 if kernel is None:
214 if Application.initialized():
215 kernel = getattr(Application.instance(), 'kernel', None)
216 if kernel is None:
217 raise RuntimeError("You didn't specify a kernel,"
218 " and no IPython Application with a kernel appears to be running."
219 )
211 loop = loop_map[gui]
220 loop = loop_map[gui]
212 if kernel.eventloop is not None and kernel.eventloop is not loop:
221 if kernel.eventloop is not None and kernel.eventloop is not loop:
213 raise RuntimeError("Cannot activate multiple GUI eventloops")
222 raise RuntimeError("Cannot activate multiple GUI eventloops")
@@ -15,20 +15,26 b' Things to do:'
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 # Standard library imports.
18 # Standard library imports
19 import __builtin__
19 import __builtin__
20 import atexit
20 import atexit
21 import sys
21 import sys
22 import time
22 import time
23 import traceback
23 import traceback
24 import logging
24 import logging
25 import uuid
26
27 from datetime import datetime
25 from signal import (
28 from signal import (
26 signal, default_int_handler, SIGINT, SIG_IGN
29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
27 )
30 )
28 # System library imports.
31
32 # System library imports
29 import zmq
33 import zmq
34 from zmq.eventloop import ioloop
35 from zmq.eventloop.zmqstream import ZMQStream
30
36
31 # Local imports.
37 # Local imports
32 from IPython.core import pylabtools
38 from IPython.core import pylabtools
33 from IPython.config.configurable import Configurable
39 from IPython.config.configurable import Configurable
34 from IPython.config.application import boolean_flag, catch_config_error
40 from IPython.config.application import boolean_flag, catch_config_error
@@ -42,11 +48,12 b' from IPython.utils import py3compat'
42 from IPython.utils.frame import extract_module_locals
48 from IPython.utils.frame import extract_module_locals
43 from IPython.utils.jsonutil import json_clean
49 from IPython.utils.jsonutil import json_clean
44 from IPython.utils.traitlets import (
50 from IPython.utils.traitlets import (
45 Any, Instance, Float, Dict, CaselessStrEnum
51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
46 )
52 )
47
53
48 from entry_point import base_launch_kernel
54 from entry_point import base_launch_kernel
49 from kernelapp import KernelApp, kernel_flags, kernel_aliases
55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 from serialize import serialize_object, unpack_apply_message
50 from session import Session, Message
57 from session import Session, Message
51 from zmqshell import ZMQInteractiveShell
58 from zmqshell import ZMQInteractiveShell
52
59
@@ -63,16 +70,21 b' class Kernel(Configurable):'
63
70
64 # attribute to override with a GUI
71 # attribute to override with a GUI
65 eventloop = Any(None)
72 eventloop = Any(None)
73 def _eventloop_changed(self, name, old, new):
74 """schedule call to eventloop from IOLoop"""
75 loop = ioloop.IOLoop.instance()
76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
66
77
67 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
68 session = Instance(Session)
79 session = Instance(Session)
69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 shell_socket = Instance('zmq.Socket')
81 shell_streams = List()
71 iopub_socket = Instance('zmq.Socket')
82 control_stream = Instance(ZMQStream)
72 stdin_socket = Instance('zmq.Socket')
83 iopub_socket = Instance(zmq.Socket)
84 stdin_socket = Instance(zmq.Socket)
73 log = Instance(logging.Logger)
85 log = Instance(logging.Logger)
74
86
75 user_module = Instance('types.ModuleType')
87 user_module = Any()
76 def _user_module_changed(self, name, old, new):
88 def _user_module_changed(self, name, old, new):
77 if self.shell is not None:
89 if self.shell is not None:
78 self.shell.user_module = new
90 self.shell.user_module = new
@@ -83,6 +95,14 b' class Kernel(Configurable):'
83 self.shell.user_ns = new
95 self.shell.user_ns = new
84 self.shell.init_user_ns()
96 self.shell.init_user_ns()
85
97
98 # identities:
99 int_id = Integer(-1)
100 ident = Unicode()
101
102 def _ident_default(self):
103 return unicode(uuid.uuid4())
104
105
86 # Private interface
106 # Private interface
87
107
88 # Time to sleep after flushing the stdout/err buffers in each execute
108 # Time to sleep after flushing the stdout/err buffers in each execute
@@ -110,15 +130,13 b' class Kernel(Configurable):'
110 # by record_ports and used by connect_request.
130 # by record_ports and used by connect_request.
111 _recorded_ports = Dict()
131 _recorded_ports = Dict()
112
132
133 # set of aborted msg_ids
134 aborted = Set()
113
135
114
136
115 def __init__(self, **kwargs):
137 def __init__(self, **kwargs):
116 super(Kernel, self).__init__(**kwargs)
138 super(Kernel, self).__init__(**kwargs)
117
139
118 # Before we even start up the shell, register *first* our exit handlers
119 # so they come before the shell's
120 atexit.register(self._at_shutdown)
121
122 # Initialize the InteractiveShell subclass
140 # Initialize the InteractiveShell subclass
123 self.shell = ZMQInteractiveShell.instance(config=self.config,
141 self.shell = ZMQInteractiveShell.instance(config=self.config,
124 profile_dir = self.profile_dir,
142 profile_dir = self.profile_dir,
@@ -127,6 +145,7 b' class Kernel(Configurable):'
127 )
145 )
128 self.shell.displayhook.session = self.session
146 self.shell.displayhook.session = self.session
129 self.shell.displayhook.pub_socket = self.iopub_socket
147 self.shell.displayhook.pub_socket = self.iopub_socket
148 self.shell.displayhook.topic = self._topic('pyout')
130 self.shell.display_pub.session = self.session
149 self.shell.display_pub.session = self.session
131 self.shell.display_pub.pub_socket = self.iopub_socket
150 self.shell.display_pub.pub_socket = self.iopub_socket
132
151
@@ -136,96 +155,131 b' class Kernel(Configurable):'
136 # Build dict of handlers for message types
155 # Build dict of handlers for message types
137 msg_types = [ 'execute_request', 'complete_request',
156 msg_types = [ 'execute_request', 'complete_request',
138 'object_info_request', 'history_request',
157 'object_info_request', 'history_request',
139 'connect_request', 'shutdown_request']
158 'connect_request', 'shutdown_request',
140 self.handlers = {}
159 'apply_request',
160 ]
161 self.shell_handlers = {}
141 for msg_type in msg_types:
162 for msg_type in msg_types:
142 self.handlers[msg_type] = getattr(self, msg_type)
163 self.shell_handlers[msg_type] = getattr(self, msg_type)
143
164
144 def do_one_iteration(self):
165 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
145 """Do one iteration of the kernel's evaluation loop.
166 self.control_handlers = {}
146 """
167 for msg_type in control_msg_types:
168 self.control_handlers[msg_type] = getattr(self, msg_type)
169
170 def dispatch_control(self, msg):
171 """dispatch control requests"""
172 idents,msg = self.session.feed_identities(msg, copy=False)
147 try:
173 try:
148 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
174 msg = self.session.unserialize(msg, content=True, copy=False)
149 except Exception:
175 except:
150 self.log.warn("Invalid Message:", exc_info=True)
176 self.log.error("Invalid Control Message", exc_info=True)
151 return
177 return
152 if msg is None:
178
179 self.log.debug("Control received: %s", msg)
180
181 header = msg['header']
182 msg_id = header['msg_id']
183 msg_type = header['msg_type']
184
185 handler = self.control_handlers.get(msg_type, None)
186 if handler is None:
187 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
188 else:
189 try:
190 handler(self.control_stream, idents, msg)
191 except Exception:
192 self.log.error("Exception in control handler:", exc_info=True)
193
194 def dispatch_shell(self, stream, msg):
195 """dispatch shell requests"""
196 # flush control requests first
197 if self.control_stream:
198 self.control_stream.flush()
199
200 idents,msg = self.session.feed_identities(msg, copy=False)
201 try:
202 msg = self.session.unserialize(msg, content=True, copy=False)
203 except:
204 self.log.error("Invalid Message", exc_info=True)
153 return
205 return
154
206
207 header = msg['header']
208 msg_id = header['msg_id']
155 msg_type = msg['header']['msg_type']
209 msg_type = msg['header']['msg_type']
156
210
157 # This assert will raise in versions of zeromq 2.0.7 and lesser.
158 # We now require 2.0.8 or above, so we can uncomment for safety.
159 # print(ident,msg, file=sys.__stdout__)
160 assert ident is not None, "Missing message part."
161
162 # Print some info about this message and leave a '--->' marker, so it's
211 # Print some info about this message and leave a '--->' marker, so it's
163 # easier to trace visually the message chain when debugging. Each
212 # easier to trace visually the message chain when debugging. Each
164 # handler prints its message at the end.
213 # handler prints its message at the end.
165 self.log.debug('\n*** MESSAGE TYPE:'+str(msg_type)+'***')
214 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
166 self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ')
215 self.log.debug(' Content: %s\n --->\n ', msg['content'])
216
217 if msg_id in self.aborted:
218 self.aborted.remove(msg_id)
219 # is it safe to assume a msg_id will not be resubmitted?
220 reply_type = msg_type.split('_')[0] + '_reply'
221 status = {'status' : 'aborted'}
222 sub = {'engine' : self.ident}
223 sub.update(status)
224 reply_msg = self.session.send(stream, reply_type, subheader=sub,
225 content=status, parent=msg, ident=idents)
226 return
167
227
168 # Find and call actual handler for message
228 handler = self.shell_handlers.get(msg_type, None)
169 handler = self.handlers.get(msg_type, None)
170 if handler is None:
229 if handler is None:
171 self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg))
230 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
172 else:
231 else:
173 handler(ident, msg)
232 # ensure default_int_handler during handler call
174
233 sig = signal(SIGINT, default_int_handler)
175 # Check whether we should exit, in case the incoming message set the
176 # exit flag on
177 if self.shell.exit_now:
178 self.log.debug('\nExiting IPython kernel...')
179 # We do a normal, clean exit, which allows any actions registered
180 # via atexit (such as history saving) to take place.
181 sys.exit(0)
182
183
184 def start(self):
185 """ Start the kernel main loop.
186 """
187 # a KeyboardInterrupt (SIGINT) can occur on any python statement, so
188 # let's ignore (SIG_IGN) them until we're in a place to handle them properly
189 signal(SIGINT,SIG_IGN)
190 poller = zmq.Poller()
191 poller.register(self.shell_socket, zmq.POLLIN)
192 # loop while self.eventloop has not been overridden
193 while self.eventloop is None:
194 try:
195 # scale by extra factor of 10, because there is no
196 # reason for this to be anything less than ~ 0.1s
197 # since it is a real poller and will respond
198 # to events immediately
199
200 # double nested try/except, to properly catch KeyboardInterrupt
201 # due to pyzmq Issue #130
202 try:
234 try:
203 poller.poll(10*1000*self._poll_interval)
235 handler(stream, idents, msg)
204 # restore raising of KeyboardInterrupt
236 except Exception:
205 signal(SIGINT, default_int_handler)
237 self.log.error("Exception in message handler:", exc_info=True)
206 self.do_one_iteration()
207 except:
208 raise
209 finally:
238 finally:
210 # prevent raising of KeyboardInterrupt
239 signal(SIGINT, sig)
211 signal(SIGINT,SIG_IGN)
240
212 except KeyboardInterrupt:
241 def enter_eventloop(self):
213 # Ctrl-C shouldn't crash the kernel
242 """enter eventloop"""
214 io.raw_print("KeyboardInterrupt caught in kernel")
243 self.log.info("entering eventloop")
215 # stop ignoring sigint, now that we are out of our own loop,
244 # restore default_int_handler
216 # we don't want to prevent future code from handling it
217 signal(SIGINT, default_int_handler)
245 signal(SIGINT, default_int_handler)
218 while self.eventloop is not None:
246 while self.eventloop is not None:
219 try:
247 try:
220 self.eventloop(self)
248 self.eventloop(self)
221 except KeyboardInterrupt:
249 except KeyboardInterrupt:
222 # Ctrl-C shouldn't crash the kernel
250 # Ctrl-C shouldn't crash the kernel
223 io.raw_print("KeyboardInterrupt caught in kernel")
251 self.log.error("KeyboardInterrupt caught in kernel")
224 continue
252 continue
225 else:
253 else:
226 # eventloop exited cleanly, this means we should stop (right?)
254 # eventloop exited cleanly, this means we should stop (right?)
227 self.eventloop = None
255 self.eventloop = None
228 break
256 break
257 self.log.info("exiting eventloop")
258 # if eventloop exits, IOLoop should stop
259 ioloop.IOLoop.instance().stop()
260
261 def start(self):
262 """register dispatchers for streams"""
263 self.shell.exit_now = False
264 if self.control_stream:
265 self.control_stream.on_recv(self.dispatch_control, copy=False)
266
267 def make_dispatcher(stream):
268 def dispatcher(msg):
269 return self.dispatch_shell(stream, msg)
270 return dispatcher
271
272 for s in self.shell_streams:
273 s.on_recv(make_dispatcher(s), copy=False)
274
275 def do_one_iteration(self):
276 """step eventloop just once"""
277 if self.control_stream:
278 self.control_stream.flush()
279 for stream in self.shell_streams:
280 # handle at most one request per iteration
281 stream.flush(zmq.POLLIN, 1)
282 stream.flush(zmq.POLLOUT)
229
283
230
284
231 def record_ports(self, ports):
285 def record_ports(self, ports):
@@ -240,18 +294,30 b' class Kernel(Configurable):'
240 # Kernel request handlers
294 # Kernel request handlers
241 #---------------------------------------------------------------------------
295 #---------------------------------------------------------------------------
242
296
297 def _make_subheader(self):
298 """init subheader dict, for execute/apply_reply"""
299 return {
300 'dependencies_met' : True,
301 'engine' : self.ident,
302 'started': datetime.now(),
303 }
304
243 def _publish_pyin(self, code, parent, execution_count):
305 def _publish_pyin(self, code, parent, execution_count):
244 """Publish the code request on the pyin stream."""
306 """Publish the code request on the pyin stream."""
245
307
246 self.session.send(self.iopub_socket, u'pyin', {u'code':code,
308 self.session.send(self.iopub_socket, u'pyin',
247 u'execution_count': execution_count}, parent=parent)
309 {u'code':code, u'execution_count': execution_count},
310 parent=parent, ident=self._topic('pyin')
311 )
248
312
249 def execute_request(self, ident, parent):
313 def execute_request(self, stream, ident, parent):
250
314
251 self.session.send(self.iopub_socket,
315 self.session.send(self.iopub_socket,
252 u'status',
316 u'status',
253 {u'execution_state':u'busy'},
317 {u'execution_state':u'busy'},
254 parent=parent )
318 parent=parent,
319 ident=self._topic('status'),
320 )
255
321
256 try:
322 try:
257 content = parent[u'content']
323 content = parent[u'content']
@@ -259,9 +325,11 b' class Kernel(Configurable):'
259 silent = content[u'silent']
325 silent = content[u'silent']
260 except:
326 except:
261 self.log.error("Got bad msg: ")
327 self.log.error("Got bad msg: ")
262 self.log.error(str(Message(parent)))
328 self.log.error("%s", parent)
263 return
329 return
264
330
331 sub = self._make_subheader()
332
265 shell = self.shell # we'll need this a lot here
333 shell = self.shell # we'll need this a lot here
266
334
267 # Replace raw_input. Note that is not sufficient to replace
335 # Replace raw_input. Note that is not sufficient to replace
@@ -289,14 +357,8 b' class Kernel(Configurable):'
289
357
290 reply_content = {}
358 reply_content = {}
291 try:
359 try:
292 if silent:
293 # run_code uses 'exec' mode, so no displayhook will fire, and it
294 # doesn't call logging or history manipulations. Print
295 # statements in that code will obviously still execute.
296 shell.run_code(code)
297 else:
298 # FIXME: the shell calls the exception handler itself.
360 # FIXME: the shell calls the exception handler itself.
299 shell.run_cell(code, store_history=True)
361 shell.run_cell(code, store_history=not silent, silent=silent)
300 except:
362 except:
301 status = u'error'
363 status = u'error'
302 # FIXME: this code right now isn't being used yet by default,
364 # FIXME: this code right now isn't being used yet by default,
@@ -327,9 +389,9 b' class Kernel(Configurable):'
327 # or not. If it did, we proceed to evaluate user_variables/expressions
389 # or not. If it did, we proceed to evaluate user_variables/expressions
328 if reply_content['status'] == 'ok':
390 if reply_content['status'] == 'ok':
329 reply_content[u'user_variables'] = \
391 reply_content[u'user_variables'] = \
330 shell.user_variables(content[u'user_variables'])
392 shell.user_variables(content.get(u'user_variables', []))
331 reply_content[u'user_expressions'] = \
393 reply_content[u'user_expressions'] = \
332 shell.user_expressions(content[u'user_expressions'])
394 shell.user_expressions(content.get(u'user_expressions', {}))
333 else:
395 else:
334 # If there was an error, don't even try to compute variables or
396 # If there was an error, don't even try to compute variables or
335 # expressions
397 # expressions
@@ -355,40 +417,49 b' class Kernel(Configurable):'
355
417
356 # Send the reply.
418 # Send the reply.
357 reply_content = json_clean(reply_content)
419 reply_content = json_clean(reply_content)
358 reply_msg = self.session.send(self.shell_socket, u'execute_reply',
359 reply_content, parent, ident=ident)
360 self.log.debug(str(reply_msg))
361
420
362 if reply_msg['content']['status'] == u'error':
421 sub['status'] = reply_content['status']
363 self._abort_queue()
422 if reply_content['status'] == 'error' and \
423 reply_content['ename'] == 'UnmetDependency':
424 sub['dependencies_met'] = False
425
426 reply_msg = self.session.send(stream, u'execute_reply',
427 reply_content, parent, subheader=sub,
428 ident=ident)
429
430 self.log.debug("%s", reply_msg)
431
432 if not silent and reply_msg['content']['status'] == u'error':
433 self._abort_queues()
364
434
365 self.session.send(self.iopub_socket,
435 self.session.send(self.iopub_socket,
366 u'status',
436 u'status',
367 {u'execution_state':u'idle'},
437 {u'execution_state':u'idle'},
368 parent=parent )
438 parent=parent,
439 ident=self._topic('status'))
369
440
370 def complete_request(self, ident, parent):
441 def complete_request(self, stream, ident, parent):
371 txt, matches = self._complete(parent)
442 txt, matches = self._complete(parent)
372 matches = {'matches' : matches,
443 matches = {'matches' : matches,
373 'matched_text' : txt,
444 'matched_text' : txt,
374 'status' : 'ok'}
445 'status' : 'ok'}
375 matches = json_clean(matches)
446 matches = json_clean(matches)
376 completion_msg = self.session.send(self.shell_socket, 'complete_reply',
447 completion_msg = self.session.send(stream, 'complete_reply',
377 matches, parent, ident)
448 matches, parent, ident)
378 self.log.debug(str(completion_msg))
449 self.log.debug("%s", completion_msg)
379
450
380 def object_info_request(self, ident, parent):
451 def object_info_request(self, stream, ident, parent):
381 content = parent['content']
452 content = parent['content']
382 object_info = self.shell.object_inspect(content['oname'],
453 object_info = self.shell.object_inspect(content['oname'],
383 detail_level = content.get('detail_level', 0)
454 detail_level = content.get('detail_level', 0)
384 )
455 )
385 # Before we send this object over, we scrub it for JSON usage
456 # Before we send this object over, we scrub it for JSON usage
386 oinfo = json_clean(object_info)
457 oinfo = json_clean(object_info)
387 msg = self.session.send(self.shell_socket, 'object_info_reply',
458 msg = self.session.send(stream, 'object_info_reply',
388 oinfo, parent, ident)
459 oinfo, parent, ident)
389 self.log.debug(msg)
460 self.log.debug("%s", msg)
390
461
391 def history_request(self, ident, parent):
462 def history_request(self, stream, ident, parent):
392 # We need to pull these out, as passing **kwargs doesn't work with
463 # We need to pull these out, as passing **kwargs doesn't work with
393 # unicode keys before Python 2.6.5.
464 # unicode keys before Python 2.6.5.
394 hist_access_type = parent['content']['hist_access_type']
465 hist_access_type = parent['content']['hist_access_type']
@@ -416,51 +487,183 b' class Kernel(Configurable):'
416 hist = list(hist)
487 hist = list(hist)
417 content = {'history' : hist}
488 content = {'history' : hist}
418 content = json_clean(content)
489 content = json_clean(content)
419 msg = self.session.send(self.shell_socket, 'history_reply',
490 msg = self.session.send(stream, 'history_reply',
420 content, parent, ident)
491 content, parent, ident)
421 self.log.debug("Sending history reply with %i entries", len(hist))
492 self.log.debug("Sending history reply with %i entries", len(hist))
422
493
423 def connect_request(self, ident, parent):
494 def connect_request(self, stream, ident, parent):
424 if self._recorded_ports is not None:
495 if self._recorded_ports is not None:
425 content = self._recorded_ports.copy()
496 content = self._recorded_ports.copy()
426 else:
497 else:
427 content = {}
498 content = {}
428 msg = self.session.send(self.shell_socket, 'connect_reply',
499 msg = self.session.send(stream, 'connect_reply',
429 content, parent, ident)
500 content, parent, ident)
430 self.log.debug(msg)
501 self.log.debug("%s", msg)
431
502
432 def shutdown_request(self, ident, parent):
503 def shutdown_request(self, stream, ident, parent):
433 self.shell.exit_now = True
504 self.shell.exit_now = True
505 content = dict(status='ok')
506 content.update(parent['content'])
507 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
508 # same content, but different msg_id for broadcasting on IOPub
434 self._shutdown_message = self.session.msg(u'shutdown_reply',
509 self._shutdown_message = self.session.msg(u'shutdown_reply',
435 parent['content'], parent)
510 content, parent
436 sys.exit(0)
511 )
512
513 self._at_shutdown()
514 # call sys.exit after a short delay
515 loop = ioloop.IOLoop.instance()
516 loop.add_timeout(time.time()+0.1, loop.stop)
517
518 #---------------------------------------------------------------------------
519 # Engine methods
520 #---------------------------------------------------------------------------
521
522 def apply_request(self, stream, ident, parent):
523 try:
524 content = parent[u'content']
525 bufs = parent[u'buffers']
526 msg_id = parent['header']['msg_id']
527 except:
528 self.log.error("Got bad msg: %s", parent, exc_info=True)
529 return
530
531 # Set the parent message of the display hook and out streams.
532 self.shell.displayhook.set_parent(parent)
533 self.shell.display_pub.set_parent(parent)
534 sys.stdout.set_parent(parent)
535 sys.stderr.set_parent(parent)
536
537 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
538 # self.iopub_socket.send(pyin_msg)
539 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
540 sub = self._make_subheader()
541 try:
542 working = self.shell.user_ns
543
544 prefix = "_"+str(msg_id).replace("-","")+"_"
545
546 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
547
548 fname = getattr(f, '__name__', 'f')
549
550 fname = prefix+"f"
551 argname = prefix+"args"
552 kwargname = prefix+"kwargs"
553 resultname = prefix+"result"
554
555 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
556 # print ns
557 working.update(ns)
558 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
559 try:
560 exec code in self.shell.user_global_ns, self.shell.user_ns
561 result = working.get(resultname)
562 finally:
563 for key in ns.iterkeys():
564 working.pop(key)
565
566 packed_result,buf = serialize_object(result)
567 result_buf = [packed_result]+buf
568 except:
569 exc_content = self._wrap_exception('apply')
570 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
571 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
572 ident=self._topic('pyerr'))
573 reply_content = exc_content
574 result_buf = []
575
576 if exc_content['ename'] == 'UnmetDependency':
577 sub['dependencies_met'] = False
578 else:
579 reply_content = {'status' : 'ok'}
580
581 # put 'ok'/'error' status in header, for scheduler introspection:
582 sub['status'] = reply_content['status']
583
584 # flush i/o
585 sys.stdout.flush()
586 sys.stderr.flush()
587
588 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
589 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
590
591 #---------------------------------------------------------------------------
592 # Control messages
593 #---------------------------------------------------------------------------
594
595 def abort_request(self, stream, ident, parent):
596 """abort a specifig msg by id"""
597 msg_ids = parent['content'].get('msg_ids', None)
598 if isinstance(msg_ids, basestring):
599 msg_ids = [msg_ids]
600 if not msg_ids:
601 self.abort_queues()
602 for mid in msg_ids:
603 self.aborted.add(str(mid))
604
605 content = dict(status='ok')
606 reply_msg = self.session.send(stream, 'abort_reply', content=content,
607 parent=parent, ident=ident)
608 self.log.debug("%s", reply_msg)
609
610 def clear_request(self, stream, idents, parent):
611 """Clear our namespace."""
612 self.shell.reset(False)
613 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
614 content = dict(status='ok'))
615
437
616
438 #---------------------------------------------------------------------------
617 #---------------------------------------------------------------------------
439 # Protected interface
618 # Protected interface
440 #---------------------------------------------------------------------------
619 #---------------------------------------------------------------------------
441
620
442 def _abort_queue(self):
621
622 def _wrap_exception(self, method=None):
623 # import here, because _wrap_exception is only used in parallel,
624 # and parallel has higher min pyzmq version
625 from IPython.parallel.error import wrap_exception
626 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
627 content = wrap_exception(e_info)
628 return content
629
630 def _topic(self, topic):
631 """prefixed topic for IOPub messages"""
632 if self.int_id >= 0:
633 base = "engine.%i" % self.int_id
634 else:
635 base = "kernel.%s" % self.ident
636
637 return py3compat.cast_bytes("%s.%s" % (base, topic))
638
639 def _abort_queues(self):
640 for stream in self.shell_streams:
641 if stream:
642 self._abort_queue(stream)
643
644 def _abort_queue(self, stream):
645 poller = zmq.Poller()
646 poller.register(stream.socket, zmq.POLLIN)
443 while True:
647 while True:
444 try:
648 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
445 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
446 except Exception:
447 self.log.warn("Invalid Message:", exc_info=True)
448 continue
449 if msg is None:
649 if msg is None:
450 break
650 return
451 else:
452 assert ident is not None, \
453 "Unexpected missing message part."
454
651
455 self.log.debug("Aborting:\n"+str(Message(msg)))
652 self.log.info("Aborting:")
653 self.log.info("%s", msg)
456 msg_type = msg['header']['msg_type']
654 msg_type = msg['header']['msg_type']
457 reply_type = msg_type.split('_')[0] + '_reply'
655 reply_type = msg_type.split('_')[0] + '_reply'
458 reply_msg = self.session.send(self.shell_socket, reply_type,
656
459 {'status' : 'aborted'}, msg, ident=ident)
657 status = {'status' : 'aborted'}
460 self.log.debug(reply_msg)
658 sub = {'engine' : self.ident}
659 sub.update(status)
660 reply_msg = self.session.send(stream, reply_type, subheader=sub,
661 content=status, parent=msg, ident=idents)
662 self.log.debug("%s", reply_msg)
461 # We need to wait a bit for requests to come in. This can probably
663 # We need to wait a bit for requests to come in. This can probably
462 # be set shorter for true asynchronous clients.
664 # be set shorter for true asynchronous clients.
463 time.sleep(0.1)
665 poller.poll(50)
666
464
667
465 def _no_raw_input(self):
668 def _no_raw_input(self):
466 """Raise StdinNotImplentedError if active frontend doesn't support
669 """Raise StdinNotImplentedError if active frontend doesn't support
@@ -490,7 +693,7 b' class Kernel(Configurable):'
490 value = reply['content']['value']
693 value = reply['content']['value']
491 except:
694 except:
492 self.log.error("Got bad raw_input reply: ")
695 self.log.error("Got bad raw_input reply: ")
493 self.log.error(str(Message(parent)))
696 self.log.error("%s", parent)
494 value = ''
697 value = ''
495 if value == '\x04':
698 if value == '\x04':
496 # EOF
699 # EOF
@@ -545,12 +748,9 b' class Kernel(Configurable):'
545 """
748 """
546 # io.rprint("Kernel at_shutdown") # dbg
749 # io.rprint("Kernel at_shutdown") # dbg
547 if self._shutdown_message is not None:
750 if self._shutdown_message is not None:
548 self.session.send(self.shell_socket, self._shutdown_message)
751 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
549 self.session.send(self.iopub_socket, self._shutdown_message)
752 self.log.debug("%s", self._shutdown_message)
550 self.log.debug(str(self._shutdown_message))
753 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
551 # A very short sleep to give zmq time to flush its message buffers
552 # before Python truly shuts down.
553 time.sleep(0.01)
554
754
555 #-----------------------------------------------------------------------------
755 #-----------------------------------------------------------------------------
556 # Aliases and Flags for the IPKernelApp
756 # Aliases and Flags for the IPKernelApp
@@ -604,8 +804,10 b' class IPKernelApp(KernelApp, InteractiveShellApp):'
604
804
605 def init_kernel(self):
805 def init_kernel(self):
606
806
807 shell_stream = ZMQStream(self.shell_socket)
808
607 kernel = Kernel(config=self.config, session=self.session,
809 kernel = Kernel(config=self.config, session=self.session,
608 shell_socket=self.shell_socket,
810 shell_streams=[shell_stream],
609 iopub_socket=self.iopub_socket,
811 iopub_socket=self.iopub_socket,
610 stdin_socket=self.stdin_socket,
812 stdin_socket=self.stdin_socket,
611 log=self.log,
813 log=self.log,
@@ -687,6 +889,13 b' def embed_kernel(module=None, local_ns=None, **kwargs):'
687 else:
889 else:
688 app = IPKernelApp.instance(**kwargs)
890 app = IPKernelApp.instance(**kwargs)
689 app.initialize([])
891 app.initialize([])
892 # Undo unnecessary sys module mangling from init_sys_modules.
893 # This would not be necessary if we could prevent it
894 # in the first place by using a different InteractiveShell
895 # subclass, as in the regular embed case.
896 main = app.kernel.shell._orig_sys_modules_main_mod
897 if main is not None:
898 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
690
899
691 # load the calling scope if not given
900 # load the calling scope if not given
692 (caller_module, caller_locals) = extract_module_locals(1)
901 (caller_module, caller_locals) = extract_module_locals(1)
@@ -15,15 +15,18 b' Authors'
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports
19 import atexit
19 import json
20 import json
20 import os
21 import os
21 import sys
22 import sys
23 import signal
22
24
23 # System library imports.
25 # System library imports
24 import zmq
26 import zmq
27 from zmq.eventloop import ioloop
25
28
26 # IPython imports.
29 # IPython imports
27 from IPython.core.ultratb import FormattedTB
30 from IPython.core.ultratb import FormattedTB
28 from IPython.core.application import (
31 from IPython.core.application import (
29 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
32 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
@@ -82,17 +85,18 b' kernel_flags.update(session_flags)'
82 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
83
86
84 class KernelApp(BaseIPythonApplication):
87 class KernelApp(BaseIPythonApplication):
85 name='pykernel'
88 name='ipkernel'
86 aliases = Dict(kernel_aliases)
89 aliases = Dict(kernel_aliases)
87 flags = Dict(kernel_flags)
90 flags = Dict(kernel_flags)
88 classes = [Session]
91 classes = [Session]
89 # the kernel class, as an importstring
92 # the kernel class, as an importstring
90 kernel_class = DottedObjectName('IPython.zmq.pykernel.Kernel')
93 kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel')
91 kernel = Any()
94 kernel = Any()
92 poller = Any() # don't restrict this even though current pollers are all Threads
95 poller = Any() # don't restrict this even though current pollers are all Threads
93 heartbeat = Instance(Heartbeat)
96 heartbeat = Instance(Heartbeat)
94 session = Instance('IPython.zmq.session.Session')
97 session = Instance('IPython.zmq.session.Session')
95 ports = Dict()
98 ports = Dict()
99 _full_connection_file = Unicode()
96
100
97 # inherit config file name from parent:
101 # inherit config file name from parent:
98 parent_appname = Unicode(config=True)
102 parent_appname = Unicode(config=True)
@@ -163,6 +167,8 b' class KernelApp(BaseIPythonApplication):'
163 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
167 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
164 except IOError:
168 except IOError:
165 self.log.debug("Connection file not found: %s", self.connection_file)
169 self.log.debug("Connection file not found: %s", self.connection_file)
170 # This means I own it, so I will clean it up:
171 atexit.register(self.cleanup_connection_file)
166 return
172 return
167 self.log.debug(u"Loading connection file %s", fname)
173 self.log.debug(u"Loading connection file %s", fname)
168 with open(fname) as f:
174 with open(fname) as f:
@@ -189,6 +195,16 b' class KernelApp(BaseIPythonApplication):'
189 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
195 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
190 iopub_port=self.iopub_port)
196 iopub_port=self.iopub_port)
191
197
198 self._full_connection_file = cf
199
200 def cleanup_connection_file(self):
201 cf = self._full_connection_file
202 self.log.debug("cleaning up connection file: %r", cf)
203 try:
204 os.remove(cf)
205 except (IOError, OSError):
206 pass
207
192 def init_connection_file(self):
208 def init_connection_file(self):
193 if not self.connection_file:
209 if not self.connection_file:
194 self.connection_file = "kernel-%s.json"%os.getpid()
210 self.connection_file = "kernel-%s.json"%os.getpid()
@@ -217,17 +233,22 b' class KernelApp(BaseIPythonApplication):'
217 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
233 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
218 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
234 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
219
235
236 def init_heartbeat(self):
237 """start the heart beating"""
220 # heartbeat doesn't share context, because it mustn't be blocked
238 # heartbeat doesn't share context, because it mustn't be blocked
221 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
239 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
222 hb_ctx = zmq.Context()
240 hb_ctx = zmq.Context()
223 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
241 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
224 self.hb_port = self.heartbeat.port
242 self.hb_port = self.heartbeat.port
225 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
243 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
244 self.heartbeat.start()
226
245
227 # Helper to make it easier to connect to an existing kernel.
246 # Helper to make it easier to connect to an existing kernel.
228 # set log-level to critical, to make sure it is output
247 # set log-level to critical, to make sure it is output
229 self.log.critical("To connect another client to this kernel, use:")
248 self.log.critical("To connect another client to this kernel, use:")
230
249
250 def log_connection_info(self):
251 """display connection info, and store ports"""
231 basename = os.path.basename(self.connection_file)
252 basename = os.path.basename(self.connection_file)
232 if basename == self.connection_file or \
253 if basename == self.connection_file or \
233 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
254 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
@@ -267,6 +288,9 b' class KernelApp(BaseIPythonApplication):'
267 displayhook_factory = import_item(str(self.displayhook_class))
288 displayhook_factory = import_item(str(self.displayhook_class))
268 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
289 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
269
290
291 def init_signal(self):
292 signal.signal(signal.SIGINT, signal.SIG_IGN)
293
270 def init_kernel(self):
294 def init_kernel(self):
271 """Create the Kernel object itself"""
295 """Create the Kernel object itself"""
272 kernel_factory = import_item(str(self.kernel_class))
296 kernel_factory = import_item(str(self.kernel_class))
@@ -286,9 +310,12 b' class KernelApp(BaseIPythonApplication):'
286 self.init_session()
310 self.init_session()
287 self.init_poller()
311 self.init_poller()
288 self.init_sockets()
312 self.init_sockets()
289 # writing connection file must be *after* init_sockets
313 self.init_heartbeat()
314 # writing/displaying connection info must be *after* init_sockets/heartbeat
315 self.log_connection_info()
290 self.write_connection_file()
316 self.write_connection_file()
291 self.init_io()
317 self.init_io()
318 self.init_signal()
292 self.init_kernel()
319 self.init_kernel()
293 # flush stdout/stderr, so that anything written to these streams during
320 # flush stdout/stderr, so that anything written to these streams during
294 # initialization do not get associated with the first execution request
321 # initialization do not get associated with the first execution request
@@ -296,11 +323,11 b' class KernelApp(BaseIPythonApplication):'
296 sys.stderr.flush()
323 sys.stderr.flush()
297
324
298 def start(self):
325 def start(self):
299 self.heartbeat.start()
300 if self.poller is not None:
326 if self.poller is not None:
301 self.poller.start()
327 self.poller.start()
302 try:
303 self.kernel.start()
328 self.kernel.start()
329 try:
330 ioloop.IOLoop.instance().start()
304 except KeyboardInterrupt:
331 except KeyboardInterrupt:
305 pass
332 pass
306
333
@@ -781,9 +781,6 b' class KernelManager(HasTraits):'
781
781
782 Parameters:
782 Parameters:
783 -----------
783 -----------
784 ipython : bool, optional (default True)
785 Whether to use an IPython kernel instead of a plain Python kernel.
786
787 launcher : callable, optional (default None)
784 launcher : callable, optional (default None)
788 A custom function for launching the kernel process (generally a
785 A custom function for launching the kernel process (generally a
789 wrapper around ``entry_point.base_launch_kernel``). In most cases,
786 wrapper around ``entry_point.base_launch_kernel``). In most cases,
@@ -805,10 +802,7 b' class KernelManager(HasTraits):'
805 self._launch_args = kw.copy()
802 self._launch_args = kw.copy()
806 launch_kernel = kw.pop('launcher', None)
803 launch_kernel = kw.pop('launcher', None)
807 if launch_kernel is None:
804 if launch_kernel is None:
808 if kw.pop('ipython', True):
809 from ipkernel import launch_kernel
805 from ipkernel import launch_kernel
810 else:
811 from pykernel import launch_kernel
812 self.kernel = launch_kernel(fname=self.connection_file, **kw)
806 self.kernel = launch_kernel(fname=self.connection_file, **kw)
813
807
814 def shutdown_kernel(self, restart=False):
808 def shutdown_kernel(self, restart=False):
@@ -629,7 +629,7 b' class Session(Configurable):'
629 if isinstance(socket, ZMQStream):
629 if isinstance(socket, ZMQStream):
630 socket = socket.socket
630 socket = socket.socket
631 try:
631 try:
632 msg_list = socket.recv_multipart(mode)
632 msg_list = socket.recv_multipart(mode, copy=copy)
633 except zmq.ZMQError as e:
633 except zmq.ZMQError as e:
634 if e.errno == zmq.EAGAIN:
634 if e.errno == zmq.EAGAIN:
635 # We can convert EAGAIN to None as we know in this case
635 # We can convert EAGAIN to None as we know in this case
@@ -23,7 +23,7 b' from subprocess import Popen, PIPE'
23 import nose.tools as nt
23 import nose.tools as nt
24
24
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
26 from IPython.utils import path
26 from IPython.utils import path, py3compat
27
27
28
28
29 #-------------------------------------------------------------------------------
29 #-------------------------------------------------------------------------------
@@ -68,17 +68,19 b' def setup_kernel(cmd):'
68 )
68 )
69 # wait for connection file to exist, timeout after 5s
69 # wait for connection file to exist, timeout after 5s
70 tic = time.time()
70 tic = time.time()
71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5:
71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
72 time.sleep(0.1)
72 time.sleep(0.1)
73
73
74 if kernel.poll() is not None:
75 o,e = kernel.communicate()
76 e = py3compat.cast_unicode(e)
77 raise IOError("Kernel failed to start:\n%s" % e)
78
74 if not os.path.exists(connection_file):
79 if not os.path.exists(connection_file):
75 if kernel.poll() is None:
80 if kernel.poll() is None:
76 kernel.terminate()
81 kernel.terminate()
77 raise IOError("Connection file %r never arrived" % connection_file)
82 raise IOError("Connection file %r never arrived" % connection_file)
78
83
79 if kernel.poll() is not None:
80 raise IOError("Kernel failed to start")
81
82 km = BlockingKernelManager(connection_file=connection_file)
84 km = BlockingKernelManager(connection_file=connection_file)
83 km.load_connection_file()
85 km.load_connection_file()
84 km.start_channels()
86 km.start_channels()
@@ -157,3 +159,33 b' def test_embed_kernel_namespace():'
157 content = msg['content']
159 content = msg['content']
158 nt.assert_false(content['found'])
160 nt.assert_false(content['found'])
159
161
162 def test_embed_kernel_reentrant():
163 """IPython.embed_kernel() can be called multiple times"""
164 cmd = '\n'.join([
165 'from IPython import embed_kernel',
166 'count = 0',
167 'def go():',
168 ' global count',
169 ' embed_kernel()',
170 ' count = count + 1',
171 '',
172 'while True:'
173 ' go()',
174 '',
175 ])
176
177 with setup_kernel(cmd) as km:
178 shell = km.shell_channel
179 for i in range(5):
180 msg_id = shell.object_info('count')
181 msg = shell.get_msg(block=True, timeout=2)
182 content = msg['content']
183 nt.assert_true(content['found'])
184 nt.assert_equals(content['string_form'], unicode(i))
185
186 # exit from embed_kernel
187 shell.execute("get_ipython().exit_now = True")
188 msg = shell.get_msg(block=True, timeout=2)
189 time.sleep(0.2)
190
191
@@ -19,8 +19,12 b' from __future__ import print_function'
19 import inspect
19 import inspect
20 import os
20 import os
21 import sys
21 import sys
22 import time
22 from subprocess import Popen, PIPE
23 from subprocess import Popen, PIPE
23
24
25 # System library imports
26 from zmq.eventloop import ioloop
27
24 # Our own
28 # Our own
25 from IPython.core.interactiveshell import (
29 from IPython.core.interactiveshell import (
26 InteractiveShell, InteractiveShellABC
30 InteractiveShell, InteractiveShellABC
@@ -39,7 +43,7 b' from IPython.utils import io'
39 from IPython.utils.jsonutil import json_clean
43 from IPython.utils.jsonutil import json_clean
40 from IPython.utils.path import get_py_filename
44 from IPython.utils.path import get_py_filename
41 from IPython.utils.process import arg_split
45 from IPython.utils.process import arg_split
42 from IPython.utils.traitlets import Instance, Type, Dict, CBool
46 from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes
43 from IPython.utils.warn import warn, error
47 from IPython.utils.warn import warn, error
44 from IPython.zmq.displayhook import ZMQShellDisplayHook, _encode_binary
48 from IPython.zmq.displayhook import ZMQShellDisplayHook, _encode_binary
45 from IPython.zmq.session import extract_header
49 from IPython.zmq.session import extract_header
@@ -56,6 +60,7 b' class ZMQDisplayPublisher(DisplayPublisher):'
56 session = Instance(Session)
60 session = Instance(Session)
57 pub_socket = Instance('zmq.Socket')
61 pub_socket = Instance('zmq.Socket')
58 parent_header = Dict({})
62 parent_header = Dict({})
63 topic = CBytes(b'displaypub')
59
64
60 def set_parent(self, parent):
65 def set_parent(self, parent):
61 """Set the parent for outbound messages."""
66 """Set the parent for outbound messages."""
@@ -78,7 +83,7 b' class ZMQDisplayPublisher(DisplayPublisher):'
78 content['metadata'] = metadata
83 content['metadata'] = metadata
79 self.session.send(
84 self.session.send(
80 self.pub_socket, u'display_data', json_clean(content),
85 self.pub_socket, u'display_data', json_clean(content),
81 parent=self.parent_header
86 parent=self.parent_header, ident=self.topic,
82 )
87 )
83
88
84 def clear_output(self, stdout=True, stderr=True, other=True):
89 def clear_output(self, stdout=True, stderr=True, other=True):
@@ -93,7 +98,7 b' class ZMQDisplayPublisher(DisplayPublisher):'
93
98
94 self.session.send(
99 self.session.send(
95 self.pub_socket, u'clear_output', content,
100 self.pub_socket, u'clear_output', content,
96 parent=self.parent_header
101 parent=self.parent_header, ident=self.topic,
97 )
102 )
98
103
99 class ZMQInteractiveShell(InteractiveShell):
104 class ZMQInteractiveShell(InteractiveShell):
@@ -115,6 +120,12 b' class ZMQInteractiveShell(InteractiveShell):'
115 def _exiter_default(self):
120 def _exiter_default(self):
116 return ZMQExitAutocall(self)
121 return ZMQExitAutocall(self)
117
122
123 def _exit_now_changed(self, name, old, new):
124 """stop eventloop when exit_now fires"""
125 if new:
126 loop = ioloop.IOLoop.instance()
127 loop.add_timeout(time.time()+0.1, loop.stop)
128
118 keepkernel_on_exit = None
129 keepkernel_on_exit = None
119
130
120 # Over ZeroMQ, GUI control isn't done with PyOS_InputHook as there is no
131 # Over ZeroMQ, GUI control isn't done with PyOS_InputHook as there is no
@@ -154,6 +165,7 b' class ZMQInteractiveShell(InteractiveShell):'
154
165
155 def ask_exit(self):
166 def ask_exit(self):
156 """Engage the exit actions."""
167 """Engage the exit actions."""
168 self.exit_now = True
157 payload = dict(
169 payload = dict(
158 source='IPython.zmq.zmqshell.ZMQInteractiveShell.ask_exit',
170 source='IPython.zmq.zmqshell.ZMQInteractiveShell.ask_exit',
159 exit=True,
171 exit=True,
@@ -172,7 +184,11 b' class ZMQInteractiveShell(InteractiveShell):'
172 dh = self.displayhook
184 dh = self.displayhook
173 # Send exception info over pub socket for other clients than the caller
185 # Send exception info over pub socket for other clients than the caller
174 # to pick up
186 # to pick up
175 exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header)
187 topic = None
188 if dh.topic:
189 topic = dh.topic.replace(b'pyout', b'pyerr')
190
191 exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header, ident=topic)
176
192
177 # FIXME - Hack: store exception info in shell object. Right now, the
193 # FIXME - Hack: store exception info in shell object. Right now, the
178 # caller is reading this info after the fact, we need to fix this logic
194 # caller is reading this info after the fact, we need to fix this logic
@@ -34,7 +34,7 b' TaskRecord keys:'
34 =============== =============== =============
34 =============== =============== =============
35 Key Type Description
35 Key Type Description
36 =============== =============== =============
36 =============== =============== =============
37 msg_id uuid(bytes) The msg ID
37 msg_id uuid(ascii) The msg ID
38 header dict The request header
38 header dict The request header
39 content dict The request content (likely empty)
39 content dict The request content (likely empty)
40 buffers list(bytes) buffers containing serialized request objects
40 buffers list(bytes) buffers containing serialized request objects
@@ -43,7 +43,7 b" client_uuid uuid(bytes) IDENT of client's socket"
43 engine_uuid uuid(bytes) IDENT of engine's socket
43 engine_uuid uuid(bytes) IDENT of engine's socket
44 started datetime time task began execution on engine
44 started datetime time task began execution on engine
45 completed datetime time task finished execution (success or failure) on engine
45 completed datetime time task finished execution (success or failure) on engine
46 resubmitted datetime time of resubmission (if applicable)
46 resubmitted uuid(ascii) msg_id of resubmitted task (if applicable)
47 result_header dict header for result
47 result_header dict header for result
48 result_content dict content for result
48 result_content dict content for result
49 result_buffers list(bytes) buffers containing serialized request objects
49 result_buffers list(bytes) buffers containing serialized request objects
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now