##// END OF EJS Templates
Merge pull request #1630 from minrk/mergekernel...
Brian E. Granger -
r6905:6cb2026f merge
parent child Browse files
Show More
@@ -0,0 +1,179 b''
1 """serialization utilities for apply messages
2
3 Authors:
4
5 * Min RK
6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
18 # Standard library imports
19 import logging
20 import os
21 import re
22 import socket
23 import sys
24
25 try:
26 import cPickle
27 pickle = cPickle
28 except:
29 cPickle = None
30 import pickle
31
32
33 # IPython imports
34 from IPython.utils import py3compat
35 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
36 from IPython.utils.newserialized import serialize, unserialize
37
38 if py3compat.PY3:
39 buffer = memoryview
40
41 #-----------------------------------------------------------------------------
42 # Serialization Functions
43 #-----------------------------------------------------------------------------
44
45 def serialize_object(obj, threshold=64e-6):
46 """Serialize an object into a list of sendable buffers.
47
48 Parameters
49 ----------
50
51 obj : object
52 The object to be serialized
53 threshold : float
54 The threshold for not double-pickling the content.
55
56
57 Returns
58 -------
59 ('pmd', [bufs]) :
60 where pmd is the pickled metadata wrapper,
61 bufs is a list of data buffers
62 """
63 databuffers = []
64 if isinstance(obj, (list, tuple)):
65 clist = canSequence(obj)
66 slist = map(serialize, clist)
67 for s in slist:
68 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
69 databuffers.append(s.getData())
70 s.data = None
71 return pickle.dumps(slist,-1), databuffers
72 elif isinstance(obj, dict):
73 sobj = {}
74 for k in sorted(obj.iterkeys()):
75 s = serialize(can(obj[k]))
76 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
77 databuffers.append(s.getData())
78 s.data = None
79 sobj[k] = s
80 return pickle.dumps(sobj,-1),databuffers
81 else:
82 s = serialize(can(obj))
83 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
84 databuffers.append(s.getData())
85 s.data = None
86 return pickle.dumps(s,-1),databuffers
87
88
89 def unserialize_object(bufs):
90 """reconstruct an object serialized by serialize_object from data buffers."""
91 bufs = list(bufs)
92 sobj = pickle.loads(bufs.pop(0))
93 if isinstance(sobj, (list, tuple)):
94 for s in sobj:
95 if s.data is None:
96 s.data = bufs.pop(0)
97 return uncanSequence(map(unserialize, sobj)), bufs
98 elif isinstance(sobj, dict):
99 newobj = {}
100 for k in sorted(sobj.iterkeys()):
101 s = sobj[k]
102 if s.data is None:
103 s.data = bufs.pop(0)
104 newobj[k] = uncan(unserialize(s))
105 return newobj, bufs
106 else:
107 if sobj.data is None:
108 sobj.data = bufs.pop(0)
109 return uncan(unserialize(sobj)), bufs
110
111 def pack_apply_message(f, args, kwargs, threshold=64e-6):
112 """pack up a function, args, and kwargs to be sent over the wire
113 as a series of buffers. Any object whose data is larger than `threshold`
114 will not have their data copied (currently only numpy arrays support zero-copy)"""
115 msg = [pickle.dumps(can(f),-1)]
116 databuffers = [] # for large objects
117 sargs, bufs = serialize_object(args,threshold)
118 msg.append(sargs)
119 databuffers.extend(bufs)
120 skwargs, bufs = serialize_object(kwargs,threshold)
121 msg.append(skwargs)
122 databuffers.extend(bufs)
123 msg.extend(databuffers)
124 return msg
125
126 def unpack_apply_message(bufs, g=None, copy=True):
127 """unpack f,args,kwargs from buffers packed by pack_apply_message()
128 Returns: original f,args,kwargs"""
129 bufs = list(bufs) # allow us to pop
130 assert len(bufs) >= 3, "not enough buffers!"
131 if not copy:
132 for i in range(3):
133 bufs[i] = bufs[i].bytes
134 cf = pickle.loads(bufs.pop(0))
135 sargs = list(pickle.loads(bufs.pop(0)))
136 skwargs = dict(pickle.loads(bufs.pop(0)))
137 # print sargs, skwargs
138 f = uncan(cf, g)
139 for sa in sargs:
140 if sa.data is None:
141 m = bufs.pop(0)
142 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
143 # always use a buffer, until memoryviews get sorted out
144 sa.data = buffer(m)
145 # disable memoryview support
146 # if copy:
147 # sa.data = buffer(m)
148 # else:
149 # sa.data = m.buffer
150 else:
151 if copy:
152 sa.data = m
153 else:
154 sa.data = m.bytes
155
156 args = uncanSequence(map(unserialize, sargs), g)
157 kwargs = {}
158 for k in sorted(skwargs.iterkeys()):
159 sa = skwargs[k]
160 if sa.data is None:
161 m = bufs.pop(0)
162 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
163 # always use a buffer, until memoryviews get sorted out
164 sa.data = buffer(m)
165 # disable memoryview support
166 # if copy:
167 # sa.data = buffer(m)
168 # else:
169 # sa.data = m.buffer
170 else:
171 if copy:
172 sa.data = m
173 else:
174 sa.data = m.bytes
175
176 kwargs[k] = uncan(unserialize(sa), g)
177
178 return f,args,kwargs
179
@@ -132,6 +132,30 b' class Application(SingletonConfigurable):'
132 132 new = getattr(logging, new)
133 133 self.log_level = new
134 134 self.log.setLevel(new)
135
136 log_format = Unicode("[%(name)s] %(message)s", config=True,
137 help="The Logging format template",
138 )
139 log = Instance(logging.Logger)
140 def _log_default(self):
141 """Start logging for this application.
142
143 The default is to log to stdout using a StreaHandler. The log level
144 starts at loggin.WARN, but this can be adjusted by setting the
145 ``log_level`` attribute.
146 """
147 log = logging.getLogger(self.__class__.__name__)
148 log.setLevel(self.log_level)
149 if sys.executable.endswith('pythonw.exe'):
150 # this should really go to a file, but file-logging is only
151 # hooked up in parallel applications
152 _log_handler = logging.StreamHandler(open(os.devnull, 'w'))
153 else:
154 _log_handler = logging.StreamHandler()
155 _log_formatter = logging.Formatter(self.log_format)
156 _log_handler.setFormatter(_log_formatter)
157 log.addHandler(_log_handler)
158 return log
135 159
136 160 # the alias map for configurables
137 161 aliases = Dict({'log-level' : 'Application.log_level'})
@@ -169,32 +193,11 b' class Application(SingletonConfigurable):'
169 193 if self.__class__ not in self.classes:
170 194 self.classes.insert(0, self.__class__)
171 195
172 self.init_logging()
173
174 196 def _config_changed(self, name, old, new):
175 197 SingletonConfigurable._config_changed(self, name, old, new)
176 198 self.log.debug('Config changed:')
177 199 self.log.debug(repr(new))
178 200
179 def init_logging(self):
180 """Start logging for this application.
181
182 The default is to log to stdout using a StreaHandler. The log level
183 starts at loggin.WARN, but this can be adjusted by setting the
184 ``log_level`` attribute.
185 """
186 self.log = logging.getLogger(self.__class__.__name__)
187 self.log.setLevel(self.log_level)
188 if sys.executable.endswith('pythonw.exe'):
189 # this should really go to a file, but file-logging is only
190 # hooked up in parallel applications
191 self._log_handler = logging.StreamHandler(open(os.devnull, 'w'))
192 else:
193 self._log_handler = logging.StreamHandler()
194 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
195 self._log_handler.setFormatter(self._log_formatter)
196 self.log.addHandler(self._log_handler)
197
198 201 @catch_config_error
199 202 def initialize(self, argv=None):
200 203 """Do the basic steps to configure me.
@@ -403,8 +403,9 b' class HistoryManager(HistoryAccessor):'
403 403 self.save_flag = threading.Event()
404 404 self.db_input_cache_lock = threading.Lock()
405 405 self.db_output_cache_lock = threading.Lock()
406 self.save_thread = HistorySavingThread(self)
407 self.save_thread.start()
406 if self.hist_file != ':memory:':
407 self.save_thread = HistorySavingThread(self)
408 self.save_thread.start()
408 409
409 410 self.new_session()
410 411
@@ -712,6 +712,7 b' class InteractiveShell(SingletonConfigurable, Magic):'
712 712 self._orig_sys_module_state['stderr'] = sys.stderr
713 713 self._orig_sys_module_state['excepthook'] = sys.excepthook
714 714 self._orig_sys_modules_main_name = self.user_module.__name__
715 self._orig_sys_modules_main_mod = sys.modules.get(self.user_module.__name__)
715 716
716 717 def restore_sys_module_state(self):
717 718 """Restore the state of the sys module."""
@@ -721,7 +722,8 b' class InteractiveShell(SingletonConfigurable, Magic):'
721 722 except AttributeError:
722 723 pass
723 724 # Reset what what done in self.init_sys_modules
724 sys.modules[self.user_module.__name__] = self._orig_sys_modules_main_name
725 if self._orig_sys_modules_main_mod is not None:
726 sys.modules[self._orig_sys_modules_main_name] = self._orig_sys_modules_main_mod
725 727
726 728 #-------------------------------------------------------------------------
727 729 # Things related to hooks
@@ -2424,7 +2426,7 b' class InteractiveShell(SingletonConfigurable, Magic):'
2424 2426 self.showtraceback()
2425 2427 warn('Unknown failure executing module: <%s>' % mod_name)
2426 2428
2427 def run_cell(self, raw_cell, store_history=False):
2429 def run_cell(self, raw_cell, store_history=False, silent=False):
2428 2430 """Run a complete IPython cell.
2429 2431
2430 2432 Parameters
@@ -2435,9 +2437,15 b' class InteractiveShell(SingletonConfigurable, Magic):'
2435 2437 If True, the raw and translated cell will be stored in IPython's
2436 2438 history. For user code calling back into IPython's machinery, this
2437 2439 should be set to False.
2440 silent : bool
2441 If True, avoid side-effets, such as implicit displayhooks, history,
2442 and logging. silent=True forces store_history=False.
2438 2443 """
2439 2444 if (not raw_cell) or raw_cell.isspace():
2440 2445 return
2446
2447 if silent:
2448 store_history = False
2441 2449
2442 2450 for line in raw_cell.splitlines():
2443 2451 self.input_splitter.push(line)
@@ -2462,8 +2470,8 b' class InteractiveShell(SingletonConfigurable, Magic):'
2462 2470 if store_history:
2463 2471 self.history_manager.store_inputs(self.execution_count,
2464 2472 cell, raw_cell)
2465
2466 self.logger.log(cell, raw_cell)
2473 if not silent:
2474 self.logger.log(cell, raw_cell)
2467 2475
2468 2476 if not prefilter_failed:
2469 2477 # don't run if prefilter failed
@@ -2483,12 +2491,16 b' class InteractiveShell(SingletonConfigurable, Magic):'
2483 2491 if store_history:
2484 2492 self.execution_count += 1
2485 2493 return None
2486
2494
2495 interactivity = "none" if silent else "last_expr"
2487 2496 self.run_ast_nodes(code_ast.body, cell_name,
2488 interactivity="last_expr")
2489
2497 interactivity=interactivity)
2498
2490 2499 # Execute any registered post-execution functions.
2491 for func, status in self._post_execute.iteritems():
2500 # unless we are silent
2501 post_exec = [] if silent else self._post_execute.iteritems()
2502
2503 for func, status in post_exec:
2492 2504 if self.disable_failing_post_execute and not status:
2493 2505 continue
2494 2506 try:
@@ -254,6 +254,61 b' class InteractiveShellTestCase(unittest.TestCase):'
254 254 self.assertEqual(ip.var_expand(u"{asdf}"), u"{asdf}")
255 255 # ZeroDivisionError
256 256 self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}")
257
258 def test_silent_nopostexec(self):
259 """run_cell(silent=True) doesn't invoke post-exec funcs"""
260 ip = get_ipython()
261
262 d = dict(called=False)
263 def set_called():
264 d['called'] = True
265
266 ip.register_post_execute(set_called)
267 ip.run_cell("1", silent=True)
268 self.assertFalse(d['called'])
269 # double-check that non-silent exec did what we expected
270 # silent to avoid
271 ip.run_cell("1")
272 self.assertTrue(d['called'])
273 # remove post-exec
274 ip._post_execute.pop(set_called)
275
276 def test_silent_noadvance(self):
277 """run_cell(silent=True) doesn't advance execution_count"""
278 ip = get_ipython()
279
280 ec = ip.execution_count
281 # silent should force store_history=False
282 ip.run_cell("1", store_history=True, silent=True)
283
284 self.assertEquals(ec, ip.execution_count)
285 # double-check that non-silent exec did what we expected
286 # silent to avoid
287 ip.run_cell("1", store_history=True)
288 self.assertEquals(ec+1, ip.execution_count)
289
290 def test_silent_nodisplayhook(self):
291 """run_cell(silent=True) doesn't trigger displayhook"""
292 ip = get_ipython()
293
294 d = dict(called=False)
295
296 trap = ip.display_trap
297 save_hook = trap.hook
298
299 def failing_hook(*args, **kwargs):
300 d['called'] = True
301
302 try:
303 trap.hook = failing_hook
304 ip.run_cell("1", silent=True)
305 self.assertFalse(d['called'])
306 # double-check that non-silent exec did what we expected
307 # silent to avoid
308 ip.run_cell("1")
309 self.assertTrue(d['called'])
310 finally:
311 trap.hook = save_hook
257 312
258 313 @skipif(sys.version_info[0] >= 3, "softspace removed in py3")
259 314 def test_print_softspace(self):
@@ -211,7 +211,7 b' class ParalleMagic(Plugin):'
211 211 print '[stdout:%i]'%eid, stdout
212 212
213 213
214 def pxrun_cell(self, raw_cell, store_history=True):
214 def pxrun_cell(self, raw_cell, store_history=False, silent=False):
215 215 """drop-in replacement for InteractiveShell.run_cell.
216 216
217 217 This executes code remotely, instead of in the local namespace.
@@ -258,7 +258,7 b' class ParalleMagic(Plugin):'
258 258 return False
259 259 else:
260 260 try:
261 result = self.active_view.execute(cell, block=False)
261 result = self.active_view.execute(cell, silent=False, block=False)
262 262 except:
263 263 ipself.showtraceback()
264 264 return True
@@ -140,8 +140,6 b' class IPythonConsoleApp(Configurable):'
140 140 frontend_flags = Any(app_flags)
141 141 frontend_aliases = Any(app_aliases)
142 142
143 pure = CBool(False, config=True,
144 help="Use a pure Python kernel instead of an IPython kernel.")
145 143 # create requested profiles by default, if they don't exist:
146 144 auto_create = CBool(True)
147 145 # connection info:
@@ -330,9 +328,7 b' class IPythonConsoleApp(Configurable):'
330 328 )
331 329 # start the kernel
332 330 if not self.existing:
333 kwargs = dict(ipython=not self.pure)
334 kwargs['extra_arguments'] = self.kernel_argv
335 self.kernel_manager.start_kernel(**kwargs)
331 self.kernel_manager.start_kernel(extra_arguments=self.kernel_argv)
336 332 elif self.sshserver:
337 333 # ssh, write new connection file
338 334 self.kernel_manager.write_connection_file()
@@ -45,8 +45,6 b' class DummyIPClusterStart(IPClusterStart):'
45 45
46 46 def init_signal(self):
47 47 pass
48 def init_logging(self):
49 pass
50 48 def reinit_logging(self):
51 49 pass
52 50
@@ -30,6 +30,7 b' from zmq.utils import jsonapi'
30 30 from IPython.external.decorator import decorator
31 31 from IPython.zmq.session import Session
32 32 from IPython.lib.security import passwd_check
33 from IPython.utils.jsonutil import date_default
33 34
34 35 try:
35 36 from docutils.core import publish_string
@@ -385,13 +386,13 b' class ZMQStreamHandler(websocket.WebSocketHandler):'
385 386 except KeyError:
386 387 pass
387 388 msg.pop('buffers')
388 return jsonapi.dumps(msg)
389 return jsonapi.dumps(msg, default=date_default)
389 390
390 391 def _on_zmq_reply(self, msg_list):
391 392 try:
392 393 msg = self._reserialize_reply(msg_list)
393 except:
394 self.application.log.critical("Malformed message: %r" % msg_list)
394 except Exception:
395 self.application.log.critical("Malformed message: %r" % msg_list, exc_info=True)
395 396 else:
396 397 self.write_message(msg)
397 398
@@ -397,7 +397,6 b' class NotebookApp(BaseIPythonApplication):'
397 397 self.cluster_manager.update_profiles()
398 398
399 399 def init_logging(self):
400 super(NotebookApp, self).init_logging()
401 400 # This prevents double log messages because tornado use a root logger that
402 401 # self.log is a child of. The logging module dipatches log messages to a log
403 402 # and all of its ancenstors until propagate is set to False.
@@ -500,6 +499,7 b' class NotebookApp(BaseIPythonApplication):'
500 499
501 500 @catch_config_error
502 501 def initialize(self, argv=None):
502 self.init_logging()
503 503 super(NotebookApp, self).initialize(argv)
504 504 self.init_configurables()
505 505 self.init_webapp()
@@ -101,9 +101,7 b' ipython qtconsole --pylab=inline # start with pylab in inline plotting mode'
101 101 # start with copy of flags
102 102 flags = dict(flags)
103 103 qt_flags = {
104 'pure' : ({'IPythonQtConsoleApp' : {'pure' : True}},
105 "Use a pure Python kernel instead of an IPython kernel."),
106 'plain' : ({'ConsoleWidget' : {'kind' : 'plain'}},
104 'plain' : ({'IPythonQtConsoleApp' : {'plain' : True}},
107 105 "Disable rich text support."),
108 106 }
109 107 qt_flags.update(boolean_flag(
@@ -180,18 +178,14 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):'
180 178 plain = CBool(False, config=True,
181 179 help="Use a plaintext widget instead of rich text (plain can't print/save).")
182 180
183 def _pure_changed(self, name, old, new):
184 kind = 'plain' if self.plain else 'rich'
181 def _plain_changed(self, name, old, new):
182 kind = 'plain' if new else 'rich'
185 183 self.config.ConsoleWidget.kind = kind
186 if self.pure:
187 self.widget_factory = FrontendWidget
188 elif self.plain:
184 if new:
189 185 self.widget_factory = IPythonWidget
190 186 else:
191 187 self.widget_factory = RichIPythonWidget
192 188
193 _plain_changed = _pure_changed
194
195 189 # the factory for creating a widget
196 190 widget_factory = Any(RichIPythonWidget)
197 191
@@ -210,7 +204,7 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):'
210 204 config=self.config,
211 205 )
212 206 # start the kernel
213 kwargs = dict(ipython=not self.pure)
207 kwargs = dict()
214 208 kwargs['extra_arguments'] = self.kernel_argv
215 209 kernel_manager.start_kernel(**kwargs)
216 210 kernel_manager.start_channels()
@@ -273,17 +267,13 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):'
273 267 self.window.add_tab_with_frontend(self.widget)
274 268 self.window.init_menu_bar()
275 269
276 self.window.setWindowTitle('Python' if self.pure else 'IPython')
270 self.window.setWindowTitle('IPython')
277 271
278 272 def init_colors(self, widget):
279 273 """Configure the coloring of the widget"""
280 274 # Note: This will be dramatically simplified when colors
281 275 # are removed from the backend.
282 276
283 if self.pure:
284 # only IPythonWidget supports styling
285 return
286
287 277 # parse the colors arg down to current known labels
288 278 try:
289 279 colors = self.config.ZMQInteractiveShell.colors
@@ -46,11 +46,11 b' def get_connection_file(app=None):'
46 46 If unspecified, the currently running app will be used
47 47 """
48 48 if app is None:
49 from IPython.zmq.kernelapp import KernelApp
50 if not KernelApp.initialized():
49 from IPython.zmq.ipkernel import IPKernelApp
50 if not IPKernelApp.initialized():
51 51 raise RuntimeError("app not specified, and not in a running Kernel")
52 52
53 app = KernelApp.instance()
53 app = IPKernelApp.instance()
54 54 return filefind(app.connection_file, ['.', app.profile_dir.security_dir])
55 55
56 56 def find_connection_file(filename, profile=None):
@@ -35,6 +35,28 b' from .client.asyncresult import *'
35 35 from .client.client import Client
36 36 from .client.remotefunction import *
37 37 from .client.view import *
38 from .util import interactive
38 39 from .controller.dependency import *
39 40
41 #-----------------------------------------------------------------------------
42 # Functions
43 #-----------------------------------------------------------------------------
44
45
46 def bind_kernel(**kwargs):
47 """Bind an Engine's Kernel to be used as a full IPython kernel.
48
49 This allows a running Engine to be used simultaneously as a full IPython kernel
50 with the QtConsole or other frontends.
51
52 This function returns immediately.
53 """
54 from IPython.parallel.apps.ipengineapp import IPEngineApp
55 if IPEngineApp.initialized():
56 app = IPEngineApp.instance()
57 else:
58 raise RuntimeError("Must be called from an IPEngineApp instance")
59
60 return app.bind_kernel(**kwargs)
61
40 62
@@ -102,6 +102,10 b' class BaseParallelApplication(BaseIPythonApplication):'
102 102 def _log_level_default(self):
103 103 # temporarily override default_log_level to INFO
104 104 return logging.INFO
105
106 def _log_format_default(self):
107 """override default log format to include time"""
108 return u"%(asctime)s.%(msecs).03d [%(name)s] %(message)s"
105 109
106 110 work_dir = Unicode(os.getcwdu(), config=True,
107 111 help='Set the working dir for the process.'
@@ -175,11 +179,14 b' class BaseParallelApplication(BaseIPythonApplication):'
175 179 else:
176 180 open_log_file = None
177 181 if open_log_file is not None:
178 self.log.removeHandler(self._log_handler)
182 while self.log.handlers:
183 self.log.removeHandler(self.log.handlers[0])
179 184 self._log_handler = logging.StreamHandler(open_log_file)
180 185 self.log.addHandler(self._log_handler)
186 else:
187 self._log_handler = self.log.handlers[0]
181 188 # Add timestamps to log format:
182 self._log_formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
189 self._log_formatter = logging.Formatter(self.log_format,
183 190 datefmt="%Y-%m-%d %H:%M:%S")
184 191 self._log_handler.setFormatter(self._log_formatter)
185 192 # do not propagate log messages to root logger
@@ -434,11 +434,9 b' class IPControllerApp(BaseParallelApplication):'
434 434 lsock = context.socket(zmq.PUB)
435 435 lsock.connect(self.log_url)
436 436 handler = PUBHandler(lsock)
437 self.log.removeHandler(self._log_handler)
438 437 handler.root_topic = 'controller'
439 438 handler.setLevel(self.log_level)
440 439 self.log.addHandler(handler)
441 self._log_handler = handler
442 440
443 441 @catch_config_error
444 442 def initialize(self, argv=None):
@@ -37,6 +37,7 b' from IPython.parallel.apps.baseapp import ('
37 37 catch_config_error,
38 38 )
39 39 from IPython.zmq.log import EnginePUBHandler
40 from IPython.zmq.ipkernel import Kernel, IPKernelApp
40 41 from IPython.zmq.session import (
41 42 Session, session_aliases, session_flags
42 43 )
@@ -44,11 +45,11 b' from IPython.zmq.session import ('
44 45 from IPython.config.configurable import Configurable
45 46
46 47 from IPython.parallel.engine.engine import EngineFactory
47 from IPython.parallel.engine.streamkernel import Kernel
48 from IPython.parallel.util import disambiguate_url, asbytes
48 from IPython.parallel.util import disambiguate_url
49 49
50 50 from IPython.utils.importstring import import_item
51 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
51 from IPython.utils.py3compat import cast_bytes
52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance
52 53
53 54
54 55 #-----------------------------------------------------------------------------
@@ -173,9 +174,17 b' class IPEngineApp(BaseParallelApplication):'
173 174 log_url = Unicode('', config=True,
174 175 help="""The URL for the iploggerapp instance, for forwarding
175 176 logging to a central location.""")
177
178 # an IPKernelApp instance, used to setup listening for shell frontends
179 kernel_app = Instance(IPKernelApp)
176 180
177 181 aliases = Dict(aliases)
178 182 flags = Dict(flags)
183
184 @property
185 def kernel(self):
186 """allow access to the Kernel object, so I look like IPKernelApp"""
187 return self.engine.kernel
179 188
180 189 def find_url_file(self):
181 190 """Set the url file.
@@ -203,7 +212,7 b' class IPEngineApp(BaseParallelApplication):'
203 212 d = json.loads(f.read())
204 213
205 214 if 'exec_key' in d:
206 config.Session.key = asbytes(d['exec_key'])
215 config.Session.key = cast_bytes(d['exec_key'])
207 216
208 217 try:
209 218 config.EngineFactory.location
@@ -220,7 +229,47 b' class IPEngineApp(BaseParallelApplication):'
220 229 config.EngineFactory.sshserver
221 230 except AttributeError:
222 231 config.EngineFactory.sshserver = d['ssh']
232
233 def bind_kernel(self, **kwargs):
234 """Promote engine to listening kernel, accessible to frontends."""
235 if self.kernel_app is not None:
236 return
237
238 self.log.info("Opening ports for direct connections as an IPython kernel")
239
240 kernel = self.kernel
241
242 kwargs.setdefault('config', self.config)
243 kwargs.setdefault('log', self.log)
244 kwargs.setdefault('profile_dir', self.profile_dir)
245 kwargs.setdefault('session', self.engine.session)
246
247 app = self.kernel_app = IPKernelApp(**kwargs)
248
249 # allow IPKernelApp.instance():
250 IPKernelApp._instance = app
223 251
252 app.init_connection_file()
253 # relevant contents of init_sockets:
254
255 app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port)
256 app.log.debug("shell ROUTER Channel on port: %i", app.shell_port)
257
258 app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port)
259 app.log.debug("iopub PUB Channel on port: %i", app.iopub_port)
260
261 kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER)
262 app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port)
263 app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port)
264
265 # start the heartbeat, and log connection info:
266
267 app.init_heartbeat()
268
269 app.log_connection_info()
270 app.write_connection_file()
271
272
224 273 def init_engine(self):
225 274 # This is the working dir by now.
226 275 sys.path.insert(0, '')
@@ -282,11 +331,9 b' class IPEngineApp(BaseParallelApplication):'
282 331 context = self.engine.context
283 332 lsock = context.socket(zmq.PUB)
284 333 lsock.connect(self.log_url)
285 self.log.removeHandler(self._log_handler)
286 334 handler = EnginePUBHandler(self.engine, lsock)
287 335 handler.setLevel(self.log_level)
288 336 self.log.addHandler(handler)
289 self._log_handler = handler
290 337
291 338 def init_mpi(self):
292 339 global mpi
@@ -23,6 +23,7 b' import copy'
23 23 import logging
24 24 import os
25 25 import stat
26 import sys
26 27 import time
27 28
28 29 # signal imports, handling various platforms, versions
@@ -59,8 +60,8 b' from IPython.utils.text import EvalFormatter'
59 60 from IPython.utils.traitlets import (
60 61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
61 62 )
62 from IPython.utils.path import get_ipython_module_path, get_home_dir
63 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63 from IPython.utils.path import get_home_dir
64 from IPython.utils.process import find_cmd, FindCmdError
64 65
65 66 from .win32support import forward_read_events
66 67
@@ -72,18 +73,13 b" WINDOWS = os.name == 'nt'"
72 73 # Paths to the kernel apps
73 74 #-----------------------------------------------------------------------------
74 75
76 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
75 77
76 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
77 'IPython.parallel.apps.ipclusterapp'
78 ))
78 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
79 79
80 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
81 'IPython.parallel.apps.ipengineapp'
82 ))
80 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
83 81
84 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
85 'IPython.parallel.apps.ipcontrollerapp'
86 ))
82 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
87 83
88 84 #-----------------------------------------------------------------------------
89 85 # Base launchers and errors
@@ -36,6 +36,7 b' from IPython.core.application import BaseIPythonApplication'
36 36 from IPython.utils.jsonutil import rekey
37 37 from IPython.utils.localinterfaces import LOCAL_IPS
38 38 from IPython.utils.path import get_ipython_dir
39 from IPython.utils.py3compat import cast_bytes
39 40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
40 41 Dict, List, Bool, Set, Any)
41 42 from IPython.external.decorator import decorator
@@ -71,6 +72,60 b' def spin_first(f, self, *args, **kwargs):'
71 72 # Classes
72 73 #--------------------------------------------------------------------------
73 74
75
76 class ExecuteReply(object):
77 """wrapper for finished Execute results"""
78 def __init__(self, msg_id, content, metadata):
79 self.msg_id = msg_id
80 self._content = content
81 self.execution_count = content['execution_count']
82 self.metadata = metadata
83
84 def __getitem__(self, key):
85 return self.metadata[key]
86
87 def __getattr__(self, key):
88 if key not in self.metadata:
89 raise AttributeError(key)
90 return self.metadata[key]
91
92 def __repr__(self):
93 pyout = self.metadata['pyout'] or {}
94 text_out = pyout.get('data', {}).get('text/plain', '')
95 if len(text_out) > 32:
96 text_out = text_out[:29] + '...'
97
98 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
99
100 def _repr_html_(self):
101 pyout = self.metadata['pyout'] or {'data':{}}
102 return pyout['data'].get("text/html")
103
104 def _repr_latex_(self):
105 pyout = self.metadata['pyout'] or {'data':{}}
106 return pyout['data'].get("text/latex")
107
108 def _repr_json_(self):
109 pyout = self.metadata['pyout'] or {'data':{}}
110 return pyout['data'].get("application/json")
111
112 def _repr_javascript_(self):
113 pyout = self.metadata['pyout'] or {'data':{}}
114 return pyout['data'].get("application/javascript")
115
116 def _repr_png_(self):
117 pyout = self.metadata['pyout'] or {'data':{}}
118 return pyout['data'].get("image/png")
119
120 def _repr_jpeg_(self):
121 pyout = self.metadata['pyout'] or {'data':{}}
122 return pyout['data'].get("image/jpeg")
123
124 def _repr_svg_(self):
125 pyout = self.metadata['pyout'] or {'data':{}}
126 return pyout['data'].get("image/svg+xml")
127
128
74 129 class Metadata(dict):
75 130 """Subclass of dict for initializing metadata values.
76 131
@@ -97,6 +152,7 b' class Metadata(dict):'
97 152 'pyerr' : None,
98 153 'stdout' : '',
99 154 'stderr' : '',
155 'outputs' : [],
100 156 }
101 157 self.update(md)
102 158 self.update(dict(*args, **kwargs))
@@ -308,15 +364,19 b' class Client(HasTraits):'
308 364 if self._cd is not None:
309 365 if url_or_file is None:
310 366 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
311 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
312 " Please specify at least one of url_or_file or profile."
367 if url_or_file is None:
368 raise ValueError(
369 "I can't find enough information to connect to a hub!"
370 " Please specify at least one of url_or_file or profile."
371 )
313 372
314 373 if not util.is_url(url_or_file):
315 374 # it's not a url, try for a file
316 375 if not os.path.exists(url_or_file):
317 376 if self._cd:
318 377 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
319 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
378 if not os.path.exists(url_or_file):
379 raise IOError("Connection file not found: %r" % url_or_file)
320 380 with open(url_or_file) as f:
321 381 cfg = json.loads(f.read())
322 382 else:
@@ -369,7 +429,7 b' class Client(HasTraits):'
369 429 if os.path.isfile(exec_key):
370 430 extra_args['keyfile'] = exec_key
371 431 else:
372 exec_key = util.asbytes(exec_key)
432 exec_key = cast_bytes(exec_key)
373 433 extra_args['key'] = exec_key
374 434 self.session = Session(**extra_args)
375 435
@@ -467,7 +527,7 b' class Client(HasTraits):'
467 527 if not isinstance(targets, (tuple, list, xrange)):
468 528 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
469 529
470 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
530 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
471 531
472 532 def _connect(self, sshserver, ssh_kwargs, timeout):
473 533 """setup all our socket connections to the cluster. This is called from
@@ -628,7 +688,30 b' class Client(HasTraits):'
628 688 print ("got unknown result: %s"%msg_id)
629 689 else:
630 690 self.outstanding.remove(msg_id)
631 self.results[msg_id] = self._unwrap_exception(msg['content'])
691
692 content = msg['content']
693 header = msg['header']
694
695 # construct metadata:
696 md = self.metadata[msg_id]
697 md.update(self._extract_metadata(header, parent, content))
698 # is this redundant?
699 self.metadata[msg_id] = md
700
701 e_outstanding = self._outstanding_dict[md['engine_uuid']]
702 if msg_id in e_outstanding:
703 e_outstanding.remove(msg_id)
704
705 # construct result:
706 if content['status'] == 'ok':
707 self.results[msg_id] = ExecuteReply(msg_id, content, md)
708 elif content['status'] == 'aborted':
709 self.results[msg_id] = error.TaskAborted(msg_id)
710 elif content['status'] == 'resubmitted':
711 # TODO: handle resubmission
712 pass
713 else:
714 self.results[msg_id] = self._unwrap_exception(content)
632 715
633 716 def _handle_apply_reply(self, msg):
634 717 """Save the reply to an apply_request into our results."""
@@ -750,8 +833,13 b' class Client(HasTraits):'
750 833 md.update({'pyerr' : self._unwrap_exception(content)})
751 834 elif msg_type == 'pyin':
752 835 md.update({'pyin' : content['code']})
836 elif msg_type == 'display_data':
837 md['outputs'].append(content)
838 elif msg_type == 'pyout':
839 md['pyout'] = content
753 840 else:
754 md.update({msg_type : content.get('data', '')})
841 # unhandled msg_type (status, etc.)
842 pass
755 843
756 844 # reduntant?
757 845 self.metadata[msg_id] = md
@@ -848,14 +936,14 b' class Client(HasTraits):'
848 936 """
849 937 if self._notification_socket:
850 938 self._flush_notifications()
939 if self._iopub_socket:
940 self._flush_iopub(self._iopub_socket)
851 941 if self._mux_socket:
852 942 self._flush_results(self._mux_socket)
853 943 if self._task_socket:
854 944 self._flush_results(self._task_socket)
855 945 if self._control_socket:
856 946 self._flush_control(self._control_socket)
857 if self._iopub_socket:
858 self._flush_iopub(self._iopub_socket)
859 947 if self._query_socket:
860 948 self._flush_ignored_hub_replies()
861 949
@@ -1024,14 +1112,16 b' class Client(HasTraits):'
1024 1112
1025 1113 return result
1026 1114
1027 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1115 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1028 1116 ident=None):
1029 1117 """construct and send an apply message via a socket.
1030 1118
1031 1119 This is the principal method with which all engine execution is performed by views.
1032 1120 """
1033 1121
1034 assert not self._closed, "cannot use me anymore, I'm closed!"
1122 if self._closed:
1123 raise RuntimeError("Client cannot be used after its sockets have been closed")
1124
1035 1125 # defaults:
1036 1126 args = args if args is not None else []
1037 1127 kwargs = kwargs if kwargs is not None else {}
@@ -1066,6 +1156,43 b' class Client(HasTraits):'
1066 1156
1067 1157 return msg
1068 1158
1159 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1160 """construct and send an execute request via a socket.
1161
1162 """
1163
1164 if self._closed:
1165 raise RuntimeError("Client cannot be used after its sockets have been closed")
1166
1167 # defaults:
1168 subheader = subheader if subheader is not None else {}
1169
1170 # validate arguments
1171 if not isinstance(code, basestring):
1172 raise TypeError("code must be text, not %s" % type(code))
1173 if not isinstance(subheader, dict):
1174 raise TypeError("subheader must be dict, not %s" % type(subheader))
1175
1176 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1177
1178
1179 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1180 subheader=subheader)
1181
1182 msg_id = msg['header']['msg_id']
1183 self.outstanding.add(msg_id)
1184 if ident:
1185 # possibly routed to a specific engine
1186 if isinstance(ident, list):
1187 ident = ident[-1]
1188 if ident in self._engines.values():
1189 # save for later, in case of engine death
1190 self._outstanding_dict[ident].add(msg_id)
1191 self.history.append(msg_id)
1192 self.metadata[msg_id]['submitted'] = datetime.now()
1193
1194 return msg
1195
1069 1196 #--------------------------------------------------------------------------
1070 1197 # construct a View object
1071 1198 #--------------------------------------------------------------------------
@@ -1221,12 +1348,6 b' class Client(HasTraits):'
1221 1348 raise TypeError("indices must be str or int, not %r"%id)
1222 1349 theids.append(id)
1223 1350
1224 for msg_id in theids:
1225 self.outstanding.discard(msg_id)
1226 if msg_id in self.history:
1227 self.history.remove(msg_id)
1228 self.results.pop(msg_id, None)
1229 self.metadata.pop(msg_id, None)
1230 1351 content = dict(msg_ids = theids)
1231 1352
1232 1353 self.session.send(self._query_socket, 'resubmit_request', content)
@@ -1238,8 +1359,10 b' class Client(HasTraits):'
1238 1359 content = msg['content']
1239 1360 if content['status'] != 'ok':
1240 1361 raise self._unwrap_exception(content)
1362 mapping = content['resubmitted']
1363 new_ids = [ mapping[msg_id] for msg_id in theids ]
1241 1364
1242 ar = AsyncHubResult(self, msg_ids=theids)
1365 ar = AsyncHubResult(self, msg_ids=new_ids)
1243 1366
1244 1367 if block:
1245 1368 ar.wait()
@@ -128,13 +128,20 b' class View(HasTraits):'
128 128
129 129 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
130 130
131
132 131 def __repr__(self):
133 132 strtargets = str(self.targets)
134 133 if len(strtargets) > 16:
135 134 strtargets = strtargets[:12]+'...]'
136 135 return "<%s %s>"%(self.__class__.__name__, strtargets)
137
136
137 def __len__(self):
138 if isinstance(self.targets, list):
139 return len(self.targets)
140 elif isinstance(self.targets, int):
141 return 1
142 else:
143 return len(self.client)
144
138 145 def set_flags(self, **kwargs):
139 146 """set my attribute flags by keyword.
140 147
@@ -195,7 +202,7 b' class View(HasTraits):'
195 202 @sync_results
196 203 @save_ids
197 204 def _really_apply(self, f, args, kwargs, block=None, **options):
198 """wrapper for client.send_apply_message"""
205 """wrapper for client.send_apply_request"""
199 206 raise NotImplementedError("Implement in subclasses")
200 207
201 208 def apply(self, f, *args, **kwargs):
@@ -533,7 +540,7 b' class DirectView(View):'
533 540 msg_ids = []
534 541 trackers = []
535 542 for ident in _idents:
536 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
543 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
537 544 ident=ident)
538 545 if track:
539 546 trackers.append(msg['tracker'])
@@ -547,6 +554,7 b' class DirectView(View):'
547 554 pass
548 555 return ar
549 556
557
550 558 @spin_after
551 559 def map(self, f, *sequences, **kwargs):
552 560 """view.map(f, *sequences, block=self.block) => list|AsyncMapResult
@@ -590,7 +598,9 b' class DirectView(View):'
590 598 pf = ParallelFunction(self, f, block=block, **kwargs)
591 599 return pf.map(*sequences)
592 600
593 def execute(self, code, targets=None, block=None):
601 @sync_results
602 @save_ids
603 def execute(self, code, silent=True, targets=None, block=None):
594 604 """Executes `code` on `targets` in blocking or nonblocking manner.
595 605
596 606 ``execute`` is always `bound` (affects engine namespace)
@@ -604,7 +614,22 b' class DirectView(View):'
604 614 whether or not to wait until done to return
605 615 default: self.block
606 616 """
607 return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
617 block = self.block if block is None else block
618 targets = self.targets if targets is None else targets
619
620 _idents = self.client._build_targets(targets)[0]
621 msg_ids = []
622 trackers = []
623 for ident in _idents:
624 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
625 msg_ids.append(msg['header']['msg_id'])
626 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
627 if block:
628 try:
629 ar.get()
630 except KeyboardInterrupt:
631 pass
632 return ar
608 633
609 634 def run(self, filename, targets=None, block=None):
610 635 """Execute contents of `filename` on my engine(s).
@@ -996,7 +1021,7 b' class LoadBalancedView(View):'
996 1021 follow = self._render_dependency(follow)
997 1022 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
998 1023
999 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
1024 msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track,
1000 1025 subheader=subheader)
1001 1026 tracker = None if track is False else msg['tracker']
1002 1027
@@ -23,9 +23,10 b' from zmq.devices import ThreadDevice'
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.py3compat import str_to_bytes
26 27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27 28
28 from IPython.parallel.util import asbytes, log_errors
29 from IPython.parallel.util import log_errors
29 30
30 31 class Heart(object):
31 32 """A basic heart object for responding to a HeartMonitor.
@@ -123,7 +124,7 b' class HeartMonitor(LoggingConfigurable):'
123 124 self.responses = set()
124 125 # print self.on_probation, self.hearts
125 126 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 self.pingstream.send(asbytes(str(self.lifetime)))
127 self.pingstream.send(str_to_bytes(str(self.lifetime)))
127 128 # flush stream to force immediate socket send
128 129 self.pingstream.flush()
129 130
@@ -151,8 +152,8 b' class HeartMonitor(LoggingConfigurable):'
151 152 @log_errors
152 153 def handle_pong(self, msg):
153 154 "a heart just beat"
154 current = asbytes(str(self.lifetime))
155 last = asbytes(str(self.last_ping))
155 current = str_to_bytes(str(self.lifetime))
156 last = str_to_bytes(str(self.last_ping))
156 157 if msg[1] == current:
157 158 delta = time.time()-self.tic
158 159 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
@@ -28,6 +28,7 b' from zmq.eventloop.zmqstream import ZMQStream'
28 28
29 29 # internal:
30 30 from IPython.utils.importstring import import_item
31 from IPython.utils.py3compat import cast_bytes
31 32 from IPython.utils.traitlets import (
32 33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 34 )
@@ -441,7 +442,7 b' class Hub(SessionFactory):'
441 442 for t in targets:
442 443 # map raw identities to ids
443 444 if isinstance(t, (str,unicode)):
444 t = self.by_ident.get(t, t)
445 t = self.by_ident.get(cast_bytes(t), t)
445 446 _targets.append(t)
446 447 targets = _targets
447 448 bad_targets = [ t for t in targets if t not in self.ids ]
@@ -467,13 +468,13 b' class Hub(SessionFactory):'
467 468 except ValueError:
468 469 idents=[]
469 470 if not idents:
470 self.log.error("Bad Monitor Message: %r", msg)
471 self.log.error("Monitor message without topic: %r", msg)
471 472 return
472 473 handler = self.monitor_handlers.get(switch, None)
473 474 if handler is not None:
474 475 handler(idents, msg)
475 476 else:
476 self.log.error("Invalid monitor topic: %r", switch)
477 self.log.error("Unrecognized monitor topic: %r", switch)
477 478
478 479
479 480 @util.log_errors
@@ -719,15 +720,18 b' class Hub(SessionFactory):'
719 720 self.unassigned.remove(msg_id)
720 721
721 722 header = msg['header']
722 engine_uuid = header.get('engine', None)
723 eid = self.by_ident.get(engine_uuid, None)
723 engine_uuid = header.get('engine', u'')
724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
725
726 status = header.get('status', None)
724 727
725 728 if msg_id in self.pending:
726 729 self.log.info("task::task %r finished on %s", msg_id, eid)
727 730 self.pending.remove(msg_id)
728 731 self.all_completed.add(msg_id)
729 732 if eid is not None:
730 self.completed[eid].append(msg_id)
733 if status != 'aborted':
734 self.completed[eid].append(msg_id)
731 735 if msg_id in self.tasks[eid]:
732 736 self.tasks[eid].remove(msg_id)
733 737 completed = header['date']
@@ -760,7 +764,7 b' class Hub(SessionFactory):'
760 764 # print (content)
761 765 msg_id = content['msg_id']
762 766 engine_uuid = content['engine_id']
763 eid = self.by_ident[util.asbytes(engine_uuid)]
767 eid = self.by_ident[cast_bytes(engine_uuid)]
764 768
765 769 self.log.info("task::task %r arrived on %r", msg_id, eid)
766 770 if msg_id in self.unassigned:
@@ -796,7 +800,7 b' class Hub(SessionFactory):'
796 800
797 801 parent = msg['parent_header']
798 802 if not parent:
799 self.log.error("iopub::invalid IOPub message: %r", msg)
803 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
800 804 return
801 805 msg_id = parent['msg_id']
802 806 msg_type = msg['header']['msg_type']
@@ -850,13 +854,13 b' class Hub(SessionFactory):'
850 854 """Register a new engine."""
851 855 content = msg['content']
852 856 try:
853 queue = util.asbytes(content['queue'])
857 queue = cast_bytes(content['queue'])
854 858 except KeyError:
855 859 self.log.error("registration::queue not specified", exc_info=True)
856 860 return
857 861 heart = content.get('heartbeat', None)
858 862 if heart:
859 heart = util.asbytes(heart)
863 heart = cast_bytes(heart)
860 864 """register a new engine, and create the socket(s) necessary"""
861 865 eid = self._next_id
862 866 # print (eid, queue, reg, heart)
@@ -1130,7 +1134,7 b' class Hub(SessionFactory):'
1130 1134
1131 1135 # validate msg_ids
1132 1136 found_ids = [ rec['msg_id'] for rec in records ]
1133 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1137 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1134 1138 if len(records) > len(msg_ids):
1135 1139 try:
1136 1140 raise RuntimeError("DB appears to be in an inconsistent state."
@@ -1143,40 +1147,46 b' class Hub(SessionFactory):'
1143 1147 raise KeyError("No such msg(s): %r" % missing)
1144 1148 except KeyError:
1145 1149 return finish(error.wrap_exception())
1146 elif invalid_ids:
1147 msg_id = invalid_ids[0]
1150 elif pending_ids:
1151 pass
1152 # no need to raise on resubmit of pending task, now that we
1153 # resubmit under new ID, but do we want to raise anyway?
1154 # msg_id = invalid_ids[0]
1155 # try:
1156 # raise ValueError("Task(s) %r appears to be inflight" % )
1157 # except Exception:
1158 # return finish(error.wrap_exception())
1159
1160 # mapping of original IDs to resubmitted IDs
1161 resubmitted = {}
1162
1163 # send the messages
1164 for rec in records:
1165 header = rec['header']
1166 msg = self.session.msg(header['msg_type'])
1167 msg_id = msg['msg_id']
1168 msg['content'] = rec['content']
1169 header.update(msg['header'])
1170 msg['header'] = header
1171
1172 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1173
1174 resubmitted[rec['msg_id']] = msg_id
1175 self.pending.add(msg_id)
1176 msg['buffers'] = []
1148 1177 try:
1149 raise ValueError("Task %r appears to be inflight" % msg_id)
1178 self.db.add_record(msg_id, init_record(msg))
1150 1179 except Exception:
1151 return finish(error.wrap_exception())
1180 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1152 1181
1153 # clear the existing records
1154 now = datetime.now()
1155 rec = empty_record()
1156 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1157 rec['resubmitted'] = now
1158 rec['queue'] = 'task'
1159 rec['client_uuid'] = client_id[0]
1160 try:
1161 for msg_id in msg_ids:
1162 self.all_completed.discard(msg_id)
1163 self.db.update_record(msg_id, rec)
1164 except Exception:
1165 self.log.error('db::db error upating record', exc_info=True)
1166 reply = error.wrap_exception()
1167 else:
1168 # send the messages
1169 for rec in records:
1170 header = rec['header']
1171 # include resubmitted in header to prevent digest collision
1172 header['resubmitted'] = now
1173 msg = self.session.msg(header['msg_type'])
1174 msg['content'] = rec['content']
1175 msg['header'] = header
1176 msg['header']['msg_id'] = rec['msg_id']
1177 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1178
1179 finish(dict(status='ok'))
1182 finish(dict(status='ok', resubmitted=resubmitted))
1183
1184 # store the new IDs in the Task DB
1185 for msg_id, resubmit_id in resubmitted.iteritems():
1186 try:
1187 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1188 except Exception:
1189 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1180 1190
1181 1191
1182 1192 def _extract_record(self, rec):
@@ -42,10 +42,11 b' from IPython.external.decorator import decorator'
42 42 from IPython.config.application import Application
43 43 from IPython.config.loader import Config
44 44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
45 from IPython.utils.py3compat import cast_bytes
45 46
46 47 from IPython.parallel import error, util
47 48 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.util import connect_logger, local_logger, asbytes
49 from IPython.parallel.util import connect_logger, local_logger
49 50
50 51 from .dependency import Dependency
51 52
@@ -262,7 +263,7 b' class TaskScheduler(SessionFactory):'
262 263 self.log.error("Unhandled message type: %r"%msg_type)
263 264 else:
264 265 try:
265 handler(asbytes(msg['content']['queue']))
266 handler(cast_bytes(msg['content']['queue']))
266 267 except Exception:
267 268 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
268 269
@@ -316,7 +317,7 b' class TaskScheduler(SessionFactory):'
316 317 # prevent double-handling of messages
317 318 continue
318 319
319 raw_msg = lost[msg_id][0]
320 raw_msg = lost[msg_id].raw_msg
320 321 idents,msg = self.session.feed_identities(raw_msg, copy=False)
321 322 parent = self.session.unpack(msg[1].bytes)
322 323 idents = [engine, idents[0]]
@@ -370,7 +371,7 b' class TaskScheduler(SessionFactory):'
370 371 # get targets as a set of bytes objects
371 372 # from a list of unicode objects
372 373 targets = header.get('targets', [])
373 targets = map(asbytes, targets)
374 targets = map(cast_bytes, targets)
374 375 targets = set(targets)
375 376
376 377 retries = header.get('retries', 0)
@@ -138,7 +138,7 b' class SQLiteDB(BaseDB):'
138 138 'engine_uuid' : 'text',
139 139 'started' : 'timestamp',
140 140 'completed' : 'timestamp',
141 'resubmitted' : 'timestamp',
141 'resubmitted' : 'text',
142 142 'received' : 'timestamp',
143 143 'result_header' : 'dict text',
144 144 'result_content' : 'dict text',
@@ -226,15 +226,16 b' class SQLiteDB(BaseDB):'
226 226 # isolation_level = None)#,
227 227 cached_statements=64)
228 228 # print dir(self._db)
229 first_table = self.table
229 first_table = previous_table = self.table
230 230 i=0
231 231 while not self._check_table():
232 232 i+=1
233 233 self.table = first_table+'_%i'%i
234 234 self.log.warn(
235 235 "Table %s exists and doesn't match db format, trying %s"%
236 (first_table,self.table)
236 (previous_table, self.table)
237 237 )
238 previous_table = self.table
238 239
239 240 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
240 241 (msg_id text PRIMARY KEY,
@@ -246,7 +247,7 b' class SQLiteDB(BaseDB):'
246 247 engine_uuid text,
247 248 started timestamp,
248 249 completed timestamp,
249 resubmitted timestamp,
250 resubmitted text,
250 251 received timestamp,
251 252 result_header dict text,
252 253 result_content dict text,
@@ -27,15 +27,14 b' from IPython.external.ssh import tunnel'
27 27 from IPython.utils.traitlets import (
28 28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
29 29 )
30 from IPython.utils import py3compat
30 from IPython.utils.py3compat import cast_bytes
31 31
32 32 from IPython.parallel.controller.heartmonitor import Heart
33 33 from IPython.parallel.factory import RegistrationFactory
34 from IPython.parallel.util import disambiguate_url, asbytes
34 from IPython.parallel.util import disambiguate_url
35 35
36 36 from IPython.zmq.session import Message
37
38 from .streamkernel import Kernel
37 from IPython.zmq.ipkernel import Kernel
39 38
40 39 class EngineFactory(RegistrationFactory):
41 40 """IPython engine"""
@@ -70,7 +69,7 b' class EngineFactory(RegistrationFactory):'
70 69 bident = CBytes()
71 70 ident = Unicode()
72 71 def _ident_changed(self, name, old, new):
73 self.bident = asbytes(new)
72 self.bident = cast_bytes(new)
74 73 using_ssh=Bool(False)
75 74
76 75
@@ -185,23 +184,27 b' class EngineFactory(RegistrationFactory):'
185 184
186 185 # create iopub stream:
187 186 iopub_addr = msg.content.iopub
188 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
189 iopub_stream.setsockopt(zmq.IDENTITY, identity)
190 connect(iopub_stream, iopub_addr)
191
192 # # Redirect input streams and set a display hook.
187 iopub_socket = ctx.socket(zmq.PUB)
188 iopub_socket.setsockopt(zmq.IDENTITY, identity)
189 connect(iopub_socket, iopub_addr)
190
191 # disable history:
192 self.config.HistoryManager.hist_file = ':memory:'
193
194 # Redirect input streams and set a display hook.
193 195 if self.out_stream_factory:
194 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
195 sys.stdout.topic = py3compat.cast_bytes('engine.%i.stdout' % self.id)
196 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
197 sys.stderr.topic = py3compat.cast_bytes('engine.%i.stderr' % self.id)
196 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
197 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
198 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
199 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
198 200 if self.display_hook_factory:
199 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
200 sys.displayhook.topic = py3compat.cast_bytes('engine.%i.pyout' % self.id)
201 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
202 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
201 203
202 204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
203 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
204 loop=loop, user_ns = self.user_ns, log=self.log)
205 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
206 loop=loop, user_ns=self.user_ns, log=self.log)
207 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
205 208 self.kernel.start()
206 209
207 210
@@ -65,7 +65,7 b' def setup():'
65 65 if cp.poll() is not None:
66 66 print cp.poll()
67 67 raise RuntimeError("The test controller failed to start.")
68 elif time.time()-tic > 10:
68 elif time.time()-tic > 15:
69 69 raise RuntimeError("Timeout waiting for the test controller to start.")
70 70 time.sleep(0.1)
71 71 add_engines(1)
@@ -93,7 +93,7 b" def add_engines(n=1, profile='iptest', total=False):"
93 93 while len(rc) < base+n:
94 94 if any([ ep.poll() is not None for ep in eps ]):
95 95 raise RuntimeError("A test engine failed to start.")
96 elif time.time()-tic > 10:
96 elif time.time()-tic > 15:
97 97 raise RuntimeError("Timeout waiting for engines to connect.")
98 98 time.sleep(.1)
99 99 rc.spin()
@@ -154,7 +154,7 b' class AsyncResultTest(ClusterTestCase):'
154 154 ar = v.apply_async(time.sleep, 0.25)
155 155 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
156 156 ar.get(2)
157 self.assertTrue(ar.serial_time < 0.5)
157 self.assertTrue(ar.serial_time < 1.)
158 158 self.assertTrue(ar.serial_time > 0.2)
159 159
160 160 def test_serial_time_multi(self):
@@ -171,8 +171,8 b' class AsyncResultTest(ClusterTestCase):'
171 171 ar = v.apply_async(time.sleep, 0.25)
172 172 while not ar.ready():
173 173 time.sleep(0.01)
174 self.assertTrue(ar.elapsed < 0.3)
175 self.assertTrue(ar.elapsed < 0.3)
174 self.assertTrue(ar.elapsed < 1)
175 self.assertTrue(ar.elapsed < 1)
176 176 ar.get(2)
177 177
178 178 def test_elapsed_multi(self):
@@ -180,8 +180,8 b' class AsyncResultTest(ClusterTestCase):'
180 180 ar = v.apply_async(time.sleep, 0.25)
181 181 while not ar.ready():
182 182 time.sleep(0.01)
183 self.assertTrue(ar.elapsed < 0.3)
184 self.assertTrue(ar.elapsed < 0.3)
183 self.assertTrue(ar.elapsed < 1)
184 self.assertTrue(ar.elapsed < 1)
185 185 ar.get(2)
186 186
187 187 def test_hubresult_timestamps(self):
@@ -280,6 +280,25 b' class TestClient(ClusterTestCase):'
280 280 time.sleep(0.25)
281 281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282 282
283 def _wait_for_idle(self):
284 """wait for an engine to become idle, according to the Hub"""
285 rc = self.client
286
287 # timeout 2s, polling every 100ms
288 for i in range(20):
289 qs = rc.queue_status()
290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 time.sleep(0.1)
292 else:
293 break
294
295 # ensure Hub up to date:
296 qs = rc.queue_status()
297 self.assertEquals(qs['unassigned'], 0)
298 for eid in rc.ids:
299 self.assertEquals(qs[eid]['tasks'], 0)
300
301
283 302 def test_resubmit(self):
284 303 def f():
285 304 import random
@@ -288,19 +307,38 b' class TestClient(ClusterTestCase):'
288 307 ar = v.apply_async(f)
289 308 r1 = ar.get(1)
290 309 # give the Hub a chance to notice:
291 time.sleep(0.5)
310 self._wait_for_idle()
292 311 ahr = self.client.resubmit(ar.msg_ids)
293 312 r2 = ahr.get(1)
294 313 self.assertFalse(r1 == r2)
295 314
315 def test_resubmit_aborted(self):
316 def f():
317 import random
318 return random.random()
319 v = self.client.load_balanced_view()
320 # restrict to one engine, so we can put a sleep
321 # ahead of the task, so it will get aborted
322 eid = self.client.ids[-1]
323 v.targets = [eid]
324 sleep = v.apply_async(time.sleep, 0.5)
325 ar = v.apply_async(f)
326 ar.abort()
327 self.assertRaises(error.TaskAborted, ar.get)
328 # Give the Hub a chance to get up to date:
329 self._wait_for_idle()
330 ahr = self.client.resubmit(ar.msg_ids)
331 r2 = ahr.get(1)
332
296 333 def test_resubmit_inflight(self):
297 """ensure ValueError on resubmit of inflight task"""
334 """resubmit of inflight task"""
298 335 v = self.client.load_balanced_view()
299 336 ar = v.apply_async(time.sleep,1)
300 337 # give the message a chance to arrive
301 338 time.sleep(0.2)
302 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
339 ahr = self.client.resubmit(ar.msg_ids)
303 340 ar.get(2)
341 ahr.get(2)
304 342
305 343 def test_resubmit_badkey(self):
306 344 """ensure KeyError on resubmit of nonexistant task"""
@@ -18,6 +18,7 b' Authors:'
18 18
19 19 from __future__ import division
20 20
21 import logging
21 22 import os
22 23 import tempfile
23 24 import time
@@ -226,7 +227,9 b' class TestSQLiteBackend(TestDictBackend):'
226 227 @dec.skip_without('sqlite3')
227 228 def create_db(self):
228 229 location, fname = os.path.split(temp_db)
229 return SQLiteDB(location=location, fname=fname)
230 log = logging.getLogger('test')
231 log.setLevel(logging.CRITICAL)
232 return SQLiteDB(location=location, fname=fname, log=log)
230 233
231 234 def tearDown(self):
232 235 self.db._db.close()
@@ -25,6 +25,7 b' import zmq'
25 25 from nose import SkipTest
26 26
27 27 from IPython.testing import decorators as dec
28 from IPython.testing.ipunittest import ParametricTestCase
28 29
29 30 from IPython import parallel as pmod
30 31 from IPython.parallel import error
@@ -39,7 +40,7 b' from .clienttest import ClusterTestCase, crash, wait, skip_without'
39 40 def setup():
40 41 add_engines(3, total=True)
41 42
42 class TestView(ClusterTestCase):
43 class TestView(ClusterTestCase, ParametricTestCase):
43 44
44 45 def test_z_crash_mux(self):
45 46 """test graceful handling of engine death (direct)"""
@@ -421,16 +422,16 b' class TestView(ClusterTestCase):'
421 422 sys.stdout = sio
422 423 ip.magic_autopx()
423 424 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
424 ip.run_cell('print b')
425 ip.run_cell('b*=2')
426 ip.run_cell('print (b)')
425 427 ip.run_cell("b/c")
426 ip.run_code(compile('b*=2', '', 'single'))
427 428 ip.magic_autopx()
428 429 sys.stdout = savestdout
429 430 output = sio.getvalue().strip()
430 431 self.assertTrue(output.startswith('%autopx enabled'))
431 432 self.assertTrue(output.endswith('%autopx disabled'))
432 433 self.assertTrue('RemoteError: ZeroDivisionError' in output)
433 ar = v.get_result(-2)
434 ar = v.get_result(-1)
434 435 self.assertEquals(v['a'], 5)
435 436 self.assertEquals(v['b'], 20)
436 437 self.assertRaisesRemote(ZeroDivisionError, ar.get)
@@ -446,9 +447,10 b' class TestView(ClusterTestCase):'
446 447 sys.stdout = sio
447 448 ip.magic_autopx()
448 449 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
449 ip.run_cell('print b')
450 ip.run_cell('print (b)')
451 ip.run_cell('import time; time.sleep(0.1)')
450 452 ip.run_cell("b/c")
451 ip.run_code(compile('b*=2', '', 'single'))
453 ip.run_cell('b*=2')
452 454 ip.magic_autopx()
453 455 sys.stdout = savestdout
454 456 output = sio.getvalue().strip()
@@ -456,9 +458,12 b' class TestView(ClusterTestCase):'
456 458 self.assertTrue(output.endswith('%autopx disabled'))
457 459 self.assertFalse('ZeroDivisionError' in output)
458 460 ar = v.get_result(-2)
459 self.assertEquals(v['a'], 5)
460 self.assertEquals(v['b'], 20)
461 461 self.assertRaisesRemote(ZeroDivisionError, ar.get)
462 # prevent TaskAborted on pulls, due to ZeroDivisionError
463 time.sleep(0.5)
464 self.assertEquals(v['a'], 5)
465 # b*=2 will not fire, due to abort
466 self.assertEquals(v['b'], 10)
462 467
463 468 def test_magic_result(self):
464 469 ip = get_ipython()
@@ -550,4 +555,140 b' class TestView(ClusterTestCase):'
550 555 check = [ -1*i for i in r ]
551 556 result = e0.map_sync(lambda x: -1*x, r)
552 557 self.assertEquals(result, check)
558
559 def test_len(self):
560 """len(view) makes sense"""
561 e0 = self.client[self.client.ids[0]]
562 yield self.assertEquals(len(e0), 1)
563 v = self.client[:]
564 yield self.assertEquals(len(v), len(self.client.ids))
565 v = self.client.direct_view('all')
566 yield self.assertEquals(len(v), len(self.client.ids))
567 v = self.client[:2]
568 yield self.assertEquals(len(v), 2)
569 v = self.client[:1]
570 yield self.assertEquals(len(v), 1)
571 v = self.client.load_balanced_view()
572 yield self.assertEquals(len(v), len(self.client.ids))
573 # parametric tests seem to require manual closing?
574 self.client.close()
575
576
577 # begin execute tests
578 def _wait_for(self, f, timeout=10):
579 tic = time.time()
580 while time.time() <= tic + timeout:
581 if f():
582 return
583 time.sleep(0.1)
584 self.client.spin()
585 if not f():
586 print "Warning: Awaited condition never arrived"
587
588
589 def test_execute_reply(self):
590 e0 = self.client[self.client.ids[0]]
591 e0.block = True
592 ar = e0.execute("5", silent=False)
593 er = ar.get()
594 self._wait_for(lambda : bool(er.pyout))
595 self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
596 self.assertEquals(er.pyout['data']['text/plain'], '5')
597
598 def test_execute_reply_stdout(self):
599 e0 = self.client[self.client.ids[0]]
600 e0.block = True
601 ar = e0.execute("print (5)", silent=False)
602 er = ar.get()
603 self._wait_for(lambda : bool(er.stdout))
604 self.assertEquals(er.stdout.strip(), '5')
605
606 def test_execute_pyout(self):
607 """execute triggers pyout with silent=False"""
608 view = self.client[:]
609 ar = view.execute("5", silent=False, block=True)
610 self._wait_for(lambda : all(ar.pyout))
611
612 expected = [{'text/plain' : '5'}] * len(view)
613 mimes = [ out['data'] for out in ar.pyout ]
614 self.assertEquals(mimes, expected)
615
616 def test_execute_silent(self):
617 """execute does not trigger pyout with silent=True"""
618 view = self.client[:]
619 ar = view.execute("5", block=True)
620 expected = [None] * len(view)
621 self.assertEquals(ar.pyout, expected)
622
623 def test_execute_magic(self):
624 """execute accepts IPython commands"""
625 view = self.client[:]
626 view.execute("a = 5")
627 ar = view.execute("%whos", block=True)
628 # this will raise, if that failed
629 ar.get(5)
630 self._wait_for(lambda : all(ar.stdout))
631 for stdout in ar.stdout:
632 lines = stdout.splitlines()
633 self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
634 found = False
635 for line in lines[2:]:
636 split = line.split()
637 if split == ['a', 'int', '5']:
638 found = True
639 break
640 self.assertTrue(found, "whos output wrong: %s" % stdout)
641
642 def test_execute_displaypub(self):
643 """execute tracks display_pub output"""
644 view = self.client[:]
645 view.execute("from IPython.core.display import *")
646 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
647
648 self._wait_for(lambda : all(len(er.outputs) >= 5 for er in ar))
649 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
650 for outputs in ar.outputs:
651 mimes = [ out['data'] for out in outputs ]
652 self.assertEquals(mimes, expected)
653
654 def test_apply_displaypub(self):
655 """apply tracks display_pub output"""
656 view = self.client[:]
657 view.execute("from IPython.core.display import *")
658
659 @interactive
660 def publish():
661 [ display(i) for i in range(5) ]
662
663 ar = view.apply_async(publish)
664 ar.get(5)
665 self._wait_for(lambda : all(len(out) >= 5 for out in ar.outputs))
666 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
667 for outputs in ar.outputs:
668 mimes = [ out['data'] for out in outputs ]
669 self.assertEquals(mimes, expected)
670
671 def test_execute_raises(self):
672 """exceptions in execute requests raise appropriately"""
673 view = self.client[-1]
674 ar = view.execute("1/0")
675 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
676
677 @dec.skipif_not_matplotlib
678 def test_magic_pylab(self):
679 """%pylab works on engines"""
680 view = self.client[-1]
681 ar = view.execute("%pylab inline")
682 # at least check if this raised:
683 reply = ar.get(5)
684 # include imports, in case user config
685 ar = view.execute("plot(rand(100))", silent=False)
686 reply = ar.get(5)
687 self._wait_for(lambda : all(ar.outputs))
688 self.assertEquals(len(reply.outputs), 1)
689 output = reply.outputs[0]
690 self.assertTrue("data" in output)
691 data = output['data']
692 self.assertTrue("image/png" in data)
693
553 694
@@ -47,6 +47,9 b' from IPython.utils import py3compat'
47 47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
48 48 from IPython.utils.newserialized import serialize, unserialize
49 49 from IPython.zmq.log import EnginePUBHandler
50 from IPython.zmq.serialize import (
51 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
52 )
50 53
51 54 if py3compat.PY3:
52 55 buffer = memoryview
@@ -121,12 +124,6 b' def log_errors(f, self, *args, **kwargs):'
121 124 self.log.error("Uncaught exception in %r" % f, exc_info=True)
122 125
123 126
124 def asbytes(s):
125 """ensure that an object is ascii bytes"""
126 if isinstance(s, unicode):
127 s = s.encode('ascii')
128 return s
129
130 127 def is_url(url):
131 128 """boolean check for whether a string is a zmq url"""
132 129 if '://' not in url:
@@ -222,140 +219,6 b' def disambiguate_url(url, location=None):'
222 219
223 220 return "%s://%s:%s"%(proto,ip,port)
224 221
225 def serialize_object(obj, threshold=64e-6):
226 """Serialize an object into a list of sendable buffers.
227
228 Parameters
229 ----------
230
231 obj : object
232 The object to be serialized
233 threshold : float
234 The threshold for not double-pickling the content.
235
236
237 Returns
238 -------
239 ('pmd', [bufs]) :
240 where pmd is the pickled metadata wrapper,
241 bufs is a list of data buffers
242 """
243 databuffers = []
244 if isinstance(obj, (list, tuple)):
245 clist = canSequence(obj)
246 slist = map(serialize, clist)
247 for s in slist:
248 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
249 databuffers.append(s.getData())
250 s.data = None
251 return pickle.dumps(slist,-1), databuffers
252 elif isinstance(obj, dict):
253 sobj = {}
254 for k in sorted(obj.iterkeys()):
255 s = serialize(can(obj[k]))
256 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
257 databuffers.append(s.getData())
258 s.data = None
259 sobj[k] = s
260 return pickle.dumps(sobj,-1),databuffers
261 else:
262 s = serialize(can(obj))
263 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
264 databuffers.append(s.getData())
265 s.data = None
266 return pickle.dumps(s,-1),databuffers
267
268
269 def unserialize_object(bufs):
270 """reconstruct an object serialized by serialize_object from data buffers."""
271 bufs = list(bufs)
272 sobj = pickle.loads(bufs.pop(0))
273 if isinstance(sobj, (list, tuple)):
274 for s in sobj:
275 if s.data is None:
276 s.data = bufs.pop(0)
277 return uncanSequence(map(unserialize, sobj)), bufs
278 elif isinstance(sobj, dict):
279 newobj = {}
280 for k in sorted(sobj.iterkeys()):
281 s = sobj[k]
282 if s.data is None:
283 s.data = bufs.pop(0)
284 newobj[k] = uncan(unserialize(s))
285 return newobj, bufs
286 else:
287 if sobj.data is None:
288 sobj.data = bufs.pop(0)
289 return uncan(unserialize(sobj)), bufs
290
291 def pack_apply_message(f, args, kwargs, threshold=64e-6):
292 """pack up a function, args, and kwargs to be sent over the wire
293 as a series of buffers. Any object whose data is larger than `threshold`
294 will not have their data copied (currently only numpy arrays support zero-copy)"""
295 msg = [pickle.dumps(can(f),-1)]
296 databuffers = [] # for large objects
297 sargs, bufs = serialize_object(args,threshold)
298 msg.append(sargs)
299 databuffers.extend(bufs)
300 skwargs, bufs = serialize_object(kwargs,threshold)
301 msg.append(skwargs)
302 databuffers.extend(bufs)
303 msg.extend(databuffers)
304 return msg
305
306 def unpack_apply_message(bufs, g=None, copy=True):
307 """unpack f,args,kwargs from buffers packed by pack_apply_message()
308 Returns: original f,args,kwargs"""
309 bufs = list(bufs) # allow us to pop
310 assert len(bufs) >= 3, "not enough buffers!"
311 if not copy:
312 for i in range(3):
313 bufs[i] = bufs[i].bytes
314 cf = pickle.loads(bufs.pop(0))
315 sargs = list(pickle.loads(bufs.pop(0)))
316 skwargs = dict(pickle.loads(bufs.pop(0)))
317 # print sargs, skwargs
318 f = uncan(cf, g)
319 for sa in sargs:
320 if sa.data is None:
321 m = bufs.pop(0)
322 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
323 # always use a buffer, until memoryviews get sorted out
324 sa.data = buffer(m)
325 # disable memoryview support
326 # if copy:
327 # sa.data = buffer(m)
328 # else:
329 # sa.data = m.buffer
330 else:
331 if copy:
332 sa.data = m
333 else:
334 sa.data = m.bytes
335
336 args = uncanSequence(map(unserialize, sargs), g)
337 kwargs = {}
338 for k in sorted(skwargs.iterkeys()):
339 sa = skwargs[k]
340 if sa.data is None:
341 m = bufs.pop(0)
342 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
343 # always use a buffer, until memoryviews get sorted out
344 sa.data = buffer(m)
345 # disable memoryview support
346 # if copy:
347 # sa.data = buffer(m)
348 # else:
349 # sa.data = m.buffer
350 else:
351 if copy:
352 sa.data = m
353 else:
354 sa.data = m.bytes
355
356 kwargs[k] = uncan(unserialize(sa), g)
357
358 return f,args,kwargs
359 222
360 223 #--------------------------------------------------------------------------
361 224 # helpers for implementing old MEC API via view.apply
@@ -43,6 +43,7 b' class ZMQShellDisplayHook(DisplayHook):'
43 43 """A displayhook subclass that publishes data using ZeroMQ. This is intended
44 44 to work with an InteractiveShell instance. It sends a dict of different
45 45 representations of the object."""
46 topic=None
46 47
47 48 session = Instance(Session)
48 49 pub_socket = Instance('zmq.Socket')
@@ -67,6 +68,6 b' class ZMQShellDisplayHook(DisplayHook):'
67 68 """Finish up all displayhook activities."""
68 69 sys.stdout.flush()
69 70 sys.stderr.flush()
70 self.session.send(self.pub_socket, self.msg)
71 self.session.send(self.pub_socket, self.msg, ident=self.topic)
71 72 self.msg = None
72 73
@@ -20,8 +20,10 b' import sys'
20 20 import zmq
21 21
22 22 # Local imports.
23 from IPython.config.application import Application
23 24 from IPython.utils import io
24 25
26
25 27 #------------------------------------------------------------------------------
26 28 # Eventloops for integrating the Kernel into different GUIs
27 29 #------------------------------------------------------------------------------
@@ -164,7 +166,10 b' def loop_cocoa(kernel):'
164 166 # but still need a Poller for when there are no active windows,
165 167 # during which time mainloop() returns immediately
166 168 poller = zmq.Poller()
167 poller.register(kernel.shell_socket, zmq.POLLIN)
169 if kernel.control_stream:
170 poller.register(kernel.control_stream.socket, zmq.POLLIN)
171 for stream in kernel.shell_streams:
172 poller.register(stream.socket, zmq.POLLIN)
168 173
169 174 while True:
170 175 try:
@@ -203,11 +208,15 b' loop_map = {'
203 208
204 209 def enable_gui(gui, kernel=None):
205 210 """Enable integration with a given GUI"""
206 if kernel is None:
207 from .ipkernel import IPKernelApp
208 kernel = IPKernelApp.instance().kernel
209 211 if gui not in loop_map:
210 212 raise ValueError("GUI %r not supported" % gui)
213 if kernel is None:
214 if Application.initialized():
215 kernel = getattr(Application.instance(), 'kernel', None)
216 if kernel is None:
217 raise RuntimeError("You didn't specify a kernel,"
218 " and no IPython Application with a kernel appears to be running."
219 )
211 220 loop = loop_map[gui]
212 221 if kernel.eventloop is not None and kernel.eventloop is not loop:
213 222 raise RuntimeError("Cannot activate multiple GUI eventloops")
@@ -15,20 +15,26 b' Things to do:'
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 # Standard library imports.
18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 import uuid
26
27 from datetime import datetime
25 28 from signal import (
26 signal, default_int_handler, SIGINT, SIG_IGN
29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
27 30 )
28 # System library imports.
31
32 # System library imports
29 33 import zmq
34 from zmq.eventloop import ioloop
35 from zmq.eventloop.zmqstream import ZMQStream
30 36
31 # Local imports.
37 # Local imports
32 38 from IPython.core import pylabtools
33 39 from IPython.config.configurable import Configurable
34 40 from IPython.config.application import boolean_flag, catch_config_error
@@ -42,11 +48,12 b' from IPython.utils import py3compat'
42 48 from IPython.utils.frame import extract_module_locals
43 49 from IPython.utils.jsonutil import json_clean
44 50 from IPython.utils.traitlets import (
45 Any, Instance, Float, Dict, CaselessStrEnum
51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
46 52 )
47 53
48 54 from entry_point import base_launch_kernel
49 55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 from serialize import serialize_object, unpack_apply_message
50 57 from session import Session, Message
51 58 from zmqshell import ZMQInteractiveShell
52 59
@@ -63,16 +70,21 b' class Kernel(Configurable):'
63 70
64 71 # attribute to override with a GUI
65 72 eventloop = Any(None)
73 def _eventloop_changed(self, name, old, new):
74 """schedule call to eventloop from IOLoop"""
75 loop = ioloop.IOLoop.instance()
76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
66 77
67 78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
68 79 session = Instance(Session)
69 80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 shell_socket = Instance('zmq.Socket')
71 iopub_socket = Instance('zmq.Socket')
72 stdin_socket = Instance('zmq.Socket')
81 shell_streams = List()
82 control_stream = Instance(ZMQStream)
83 iopub_socket = Instance(zmq.Socket)
84 stdin_socket = Instance(zmq.Socket)
73 85 log = Instance(logging.Logger)
74 86
75 user_module = Instance('types.ModuleType')
87 user_module = Any()
76 88 def _user_module_changed(self, name, old, new):
77 89 if self.shell is not None:
78 90 self.shell.user_module = new
@@ -83,8 +95,16 b' class Kernel(Configurable):'
83 95 self.shell.user_ns = new
84 96 self.shell.init_user_ns()
85 97
86 # Private interface
98 # identities:
99 int_id = Integer(-1)
100 ident = Unicode()
101
102 def _ident_default(self):
103 return unicode(uuid.uuid4())
87 104
105
106 # Private interface
107
88 108 # Time to sleep after flushing the stdout/err buffers in each execute
89 109 # cycle. While this introduces a hard limit on the minimal latency of the
90 110 # execute cycle, it helps prevent output synchronization problems for
@@ -109,16 +129,14 b' class Kernel(Configurable):'
109 129 # This is a dict of port number that the kernel is listening on. It is set
110 130 # by record_ports and used by connect_request.
111 131 _recorded_ports = Dict()
112
132
133 # set of aborted msg_ids
134 aborted = Set()
113 135
114 136
115 137 def __init__(self, **kwargs):
116 138 super(Kernel, self).__init__(**kwargs)
117 139
118 # Before we even start up the shell, register *first* our exit handlers
119 # so they come before the shell's
120 atexit.register(self._at_shutdown)
121
122 140 # Initialize the InteractiveShell subclass
123 141 self.shell = ZMQInteractiveShell.instance(config=self.config,
124 142 profile_dir = self.profile_dir,
@@ -127,6 +145,7 b' class Kernel(Configurable):'
127 145 )
128 146 self.shell.displayhook.session = self.session
129 147 self.shell.displayhook.pub_socket = self.iopub_socket
148 self.shell.displayhook.topic = self._topic('pyout')
130 149 self.shell.display_pub.session = self.session
131 150 self.shell.display_pub.pub_socket = self.iopub_socket
132 151
@@ -136,96 +155,131 b' class Kernel(Configurable):'
136 155 # Build dict of handlers for message types
137 156 msg_types = [ 'execute_request', 'complete_request',
138 157 'object_info_request', 'history_request',
139 'connect_request', 'shutdown_request']
140 self.handlers = {}
158 'connect_request', 'shutdown_request',
159 'apply_request',
160 ]
161 self.shell_handlers = {}
141 162 for msg_type in msg_types:
142 self.handlers[msg_type] = getattr(self, msg_type)
143
144 def do_one_iteration(self):
145 """Do one iteration of the kernel's evaluation loop.
146 """
163 self.shell_handlers[msg_type] = getattr(self, msg_type)
164
165 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
166 self.control_handlers = {}
167 for msg_type in control_msg_types:
168 self.control_handlers[msg_type] = getattr(self, msg_type)
169
170 def dispatch_control(self, msg):
171 """dispatch control requests"""
172 idents,msg = self.session.feed_identities(msg, copy=False)
147 173 try:
148 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
149 except Exception:
150 self.log.warn("Invalid Message:", exc_info=True)
151 return
152 if msg is None:
174 msg = self.session.unserialize(msg, content=True, copy=False)
175 except:
176 self.log.error("Invalid Control Message", exc_info=True)
153 177 return
154 178
155 msg_type = msg['header']['msg_type']
179 self.log.debug("Control received: %s", msg)
180
181 header = msg['header']
182 msg_id = header['msg_id']
183 msg_type = header['msg_type']
156 184
157 # This assert will raise in versions of zeromq 2.0.7 and lesser.
158 # We now require 2.0.8 or above, so we can uncomment for safety.
159 # print(ident,msg, file=sys.__stdout__)
160 assert ident is not None, "Missing message part."
185 handler = self.control_handlers.get(msg_type, None)
186 if handler is None:
187 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
188 else:
189 try:
190 handler(self.control_stream, idents, msg)
191 except Exception:
192 self.log.error("Exception in control handler:", exc_info=True)
193
194 def dispatch_shell(self, stream, msg):
195 """dispatch shell requests"""
196 # flush control requests first
197 if self.control_stream:
198 self.control_stream.flush()
199
200 idents,msg = self.session.feed_identities(msg, copy=False)
201 try:
202 msg = self.session.unserialize(msg, content=True, copy=False)
203 except:
204 self.log.error("Invalid Message", exc_info=True)
205 return
161 206
207 header = msg['header']
208 msg_id = header['msg_id']
209 msg_type = msg['header']['msg_type']
210
162 211 # Print some info about this message and leave a '--->' marker, so it's
163 212 # easier to trace visually the message chain when debugging. Each
164 213 # handler prints its message at the end.
165 self.log.debug('\n*** MESSAGE TYPE:'+str(msg_type)+'***')
166 self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ')
214 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
215 self.log.debug(' Content: %s\n --->\n ', msg['content'])
167 216
168 # Find and call actual handler for message
169 handler = self.handlers.get(msg_type, None)
217 if msg_id in self.aborted:
218 self.aborted.remove(msg_id)
219 # is it safe to assume a msg_id will not be resubmitted?
220 reply_type = msg_type.split('_')[0] + '_reply'
221 status = {'status' : 'aborted'}
222 sub = {'engine' : self.ident}
223 sub.update(status)
224 reply_msg = self.session.send(stream, reply_type, subheader=sub,
225 content=status, parent=msg, ident=idents)
226 return
227
228 handler = self.shell_handlers.get(msg_type, None)
170 229 if handler is None:
171 self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg))
230 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
172 231 else:
173 handler(ident, msg)
174
175 # Check whether we should exit, in case the incoming message set the
176 # exit flag on
177 if self.shell.exit_now:
178 self.log.debug('\nExiting IPython kernel...')
179 # We do a normal, clean exit, which allows any actions registered
180 # via atexit (such as history saving) to take place.
181 sys.exit(0)
182
183
184 def start(self):
185 """ Start the kernel main loop.
186 """
187 # a KeyboardInterrupt (SIGINT) can occur on any python statement, so
188 # let's ignore (SIG_IGN) them until we're in a place to handle them properly
189 signal(SIGINT,SIG_IGN)
190 poller = zmq.Poller()
191 poller.register(self.shell_socket, zmq.POLLIN)
192 # loop while self.eventloop has not been overridden
193 while self.eventloop is None:
232 # ensure default_int_handler during handler call
233 sig = signal(SIGINT, default_int_handler)
194 234 try:
195 # scale by extra factor of 10, because there is no
196 # reason for this to be anything less than ~ 0.1s
197 # since it is a real poller and will respond
198 # to events immediately
199
200 # double nested try/except, to properly catch KeyboardInterrupt
201 # due to pyzmq Issue #130
202 try:
203 poller.poll(10*1000*self._poll_interval)
204 # restore raising of KeyboardInterrupt
205 signal(SIGINT, default_int_handler)
206 self.do_one_iteration()
207 except:
208 raise
209 finally:
210 # prevent raising of KeyboardInterrupt
211 signal(SIGINT,SIG_IGN)
212 except KeyboardInterrupt:
213 # Ctrl-C shouldn't crash the kernel
214 io.raw_print("KeyboardInterrupt caught in kernel")
215 # stop ignoring sigint, now that we are out of our own loop,
216 # we don't want to prevent future code from handling it
235 handler(stream, idents, msg)
236 except Exception:
237 self.log.error("Exception in message handler:", exc_info=True)
238 finally:
239 signal(SIGINT, sig)
240
241 def enter_eventloop(self):
242 """enter eventloop"""
243 self.log.info("entering eventloop")
244 # restore default_int_handler
217 245 signal(SIGINT, default_int_handler)
218 246 while self.eventloop is not None:
219 247 try:
220 248 self.eventloop(self)
221 249 except KeyboardInterrupt:
222 250 # Ctrl-C shouldn't crash the kernel
223 io.raw_print("KeyboardInterrupt caught in kernel")
251 self.log.error("KeyboardInterrupt caught in kernel")
224 252 continue
225 253 else:
226 254 # eventloop exited cleanly, this means we should stop (right?)
227 255 self.eventloop = None
228 256 break
257 self.log.info("exiting eventloop")
258 # if eventloop exits, IOLoop should stop
259 ioloop.IOLoop.instance().stop()
260
261 def start(self):
262 """register dispatchers for streams"""
263 self.shell.exit_now = False
264 if self.control_stream:
265 self.control_stream.on_recv(self.dispatch_control, copy=False)
266
267 def make_dispatcher(stream):
268 def dispatcher(msg):
269 return self.dispatch_shell(stream, msg)
270 return dispatcher
271
272 for s in self.shell_streams:
273 s.on_recv(make_dispatcher(s), copy=False)
274
275 def do_one_iteration(self):
276 """step eventloop just once"""
277 if self.control_stream:
278 self.control_stream.flush()
279 for stream in self.shell_streams:
280 # handle at most one request per iteration
281 stream.flush(zmq.POLLIN, 1)
282 stream.flush(zmq.POLLOUT)
229 283
230 284
231 285 def record_ports(self, ports):
@@ -239,19 +293,31 b' class Kernel(Configurable):'
239 293 #---------------------------------------------------------------------------
240 294 # Kernel request handlers
241 295 #---------------------------------------------------------------------------
242
296
297 def _make_subheader(self):
298 """init subheader dict, for execute/apply_reply"""
299 return {
300 'dependencies_met' : True,
301 'engine' : self.ident,
302 'started': datetime.now(),
303 }
304
243 305 def _publish_pyin(self, code, parent, execution_count):
244 306 """Publish the code request on the pyin stream."""
245 307
246 self.session.send(self.iopub_socket, u'pyin', {u'code':code,
247 u'execution_count': execution_count}, parent=parent)
308 self.session.send(self.iopub_socket, u'pyin',
309 {u'code':code, u'execution_count': execution_count},
310 parent=parent, ident=self._topic('pyin')
311 )
248 312
249 def execute_request(self, ident, parent):
313 def execute_request(self, stream, ident, parent):
250 314
251 315 self.session.send(self.iopub_socket,
252 316 u'status',
253 317 {u'execution_state':u'busy'},
254 parent=parent )
318 parent=parent,
319 ident=self._topic('status'),
320 )
255 321
256 322 try:
257 323 content = parent[u'content']
@@ -259,8 +325,10 b' class Kernel(Configurable):'
259 325 silent = content[u'silent']
260 326 except:
261 327 self.log.error("Got bad msg: ")
262 self.log.error(str(Message(parent)))
328 self.log.error("%s", parent)
263 329 return
330
331 sub = self._make_subheader()
264 332
265 333 shell = self.shell # we'll need this a lot here
266 334
@@ -289,14 +357,8 b' class Kernel(Configurable):'
289 357
290 358 reply_content = {}
291 359 try:
292 if silent:
293 # run_code uses 'exec' mode, so no displayhook will fire, and it
294 # doesn't call logging or history manipulations. Print
295 # statements in that code will obviously still execute.
296 shell.run_code(code)
297 else:
298 # FIXME: the shell calls the exception handler itself.
299 shell.run_cell(code, store_history=True)
360 # FIXME: the shell calls the exception handler itself.
361 shell.run_cell(code, store_history=not silent, silent=silent)
300 362 except:
301 363 status = u'error'
302 364 # FIXME: this code right now isn't being used yet by default,
@@ -314,7 +376,7 b' class Kernel(Configurable):'
314 376 reply_content[u'status'] = status
315 377
316 378 # Return the execution counter so clients can display prompts
317 reply_content['execution_count'] = shell.execution_count -1
379 reply_content['execution_count'] = shell.execution_count - 1
318 380
319 381 # FIXME - fish exception info out of shell, possibly left there by
320 382 # runlines. We'll need to clean up this logic later.
@@ -327,9 +389,9 b' class Kernel(Configurable):'
327 389 # or not. If it did, we proceed to evaluate user_variables/expressions
328 390 if reply_content['status'] == 'ok':
329 391 reply_content[u'user_variables'] = \
330 shell.user_variables(content[u'user_variables'])
392 shell.user_variables(content.get(u'user_variables', []))
331 393 reply_content[u'user_expressions'] = \
332 shell.user_expressions(content[u'user_expressions'])
394 shell.user_expressions(content.get(u'user_expressions', {}))
333 395 else:
334 396 # If there was an error, don't even try to compute variables or
335 397 # expressions
@@ -355,40 +417,49 b' class Kernel(Configurable):'
355 417
356 418 # Send the reply.
357 419 reply_content = json_clean(reply_content)
358 reply_msg = self.session.send(self.shell_socket, u'execute_reply',
359 reply_content, parent, ident=ident)
360 self.log.debug(str(reply_msg))
420
421 sub['status'] = reply_content['status']
422 if reply_content['status'] == 'error' and \
423 reply_content['ename'] == 'UnmetDependency':
424 sub['dependencies_met'] = False
425
426 reply_msg = self.session.send(stream, u'execute_reply',
427 reply_content, parent, subheader=sub,
428 ident=ident)
429
430 self.log.debug("%s", reply_msg)
361 431
362 if reply_msg['content']['status'] == u'error':
363 self._abort_queue()
432 if not silent and reply_msg['content']['status'] == u'error':
433 self._abort_queues()
364 434
365 435 self.session.send(self.iopub_socket,
366 436 u'status',
367 437 {u'execution_state':u'idle'},
368 parent=parent )
438 parent=parent,
439 ident=self._topic('status'))
369 440
370 def complete_request(self, ident, parent):
441 def complete_request(self, stream, ident, parent):
371 442 txt, matches = self._complete(parent)
372 443 matches = {'matches' : matches,
373 444 'matched_text' : txt,
374 445 'status' : 'ok'}
375 446 matches = json_clean(matches)
376 completion_msg = self.session.send(self.shell_socket, 'complete_reply',
447 completion_msg = self.session.send(stream, 'complete_reply',
377 448 matches, parent, ident)
378 self.log.debug(str(completion_msg))
449 self.log.debug("%s", completion_msg)
379 450
380 def object_info_request(self, ident, parent):
451 def object_info_request(self, stream, ident, parent):
381 452 content = parent['content']
382 453 object_info = self.shell.object_inspect(content['oname'],
383 454 detail_level = content.get('detail_level', 0)
384 455 )
385 456 # Before we send this object over, we scrub it for JSON usage
386 457 oinfo = json_clean(object_info)
387 msg = self.session.send(self.shell_socket, 'object_info_reply',
458 msg = self.session.send(stream, 'object_info_reply',
388 459 oinfo, parent, ident)
389 self.log.debug(msg)
460 self.log.debug("%s", msg)
390 461
391 def history_request(self, ident, parent):
462 def history_request(self, stream, ident, parent):
392 463 # We need to pull these out, as passing **kwargs doesn't work with
393 464 # unicode keys before Python 2.6.5.
394 465 hist_access_type = parent['content']['hist_access_type']
@@ -416,51 +487,183 b' class Kernel(Configurable):'
416 487 hist = list(hist)
417 488 content = {'history' : hist}
418 489 content = json_clean(content)
419 msg = self.session.send(self.shell_socket, 'history_reply',
490 msg = self.session.send(stream, 'history_reply',
420 491 content, parent, ident)
421 492 self.log.debug("Sending history reply with %i entries", len(hist))
422 493
423 def connect_request(self, ident, parent):
494 def connect_request(self, stream, ident, parent):
424 495 if self._recorded_ports is not None:
425 496 content = self._recorded_ports.copy()
426 497 else:
427 498 content = {}
428 msg = self.session.send(self.shell_socket, 'connect_reply',
499 msg = self.session.send(stream, 'connect_reply',
429 500 content, parent, ident)
430 self.log.debug(msg)
501 self.log.debug("%s", msg)
431 502
432 def shutdown_request(self, ident, parent):
503 def shutdown_request(self, stream, ident, parent):
433 504 self.shell.exit_now = True
505 content = dict(status='ok')
506 content.update(parent['content'])
507 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
508 # same content, but different msg_id for broadcasting on IOPub
434 509 self._shutdown_message = self.session.msg(u'shutdown_reply',
435 parent['content'], parent)
436 sys.exit(0)
510 content, parent
511 )
512
513 self._at_shutdown()
514 # call sys.exit after a short delay
515 loop = ioloop.IOLoop.instance()
516 loop.add_timeout(time.time()+0.1, loop.stop)
517
518 #---------------------------------------------------------------------------
519 # Engine methods
520 #---------------------------------------------------------------------------
521
522 def apply_request(self, stream, ident, parent):
523 try:
524 content = parent[u'content']
525 bufs = parent[u'buffers']
526 msg_id = parent['header']['msg_id']
527 except:
528 self.log.error("Got bad msg: %s", parent, exc_info=True)
529 return
530
531 # Set the parent message of the display hook and out streams.
532 self.shell.displayhook.set_parent(parent)
533 self.shell.display_pub.set_parent(parent)
534 sys.stdout.set_parent(parent)
535 sys.stderr.set_parent(parent)
536
537 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
538 # self.iopub_socket.send(pyin_msg)
539 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
540 sub = self._make_subheader()
541 try:
542 working = self.shell.user_ns
543
544 prefix = "_"+str(msg_id).replace("-","")+"_"
545
546 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
547
548 fname = getattr(f, '__name__', 'f')
549
550 fname = prefix+"f"
551 argname = prefix+"args"
552 kwargname = prefix+"kwargs"
553 resultname = prefix+"result"
554
555 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
556 # print ns
557 working.update(ns)
558 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
559 try:
560 exec code in self.shell.user_global_ns, self.shell.user_ns
561 result = working.get(resultname)
562 finally:
563 for key in ns.iterkeys():
564 working.pop(key)
565
566 packed_result,buf = serialize_object(result)
567 result_buf = [packed_result]+buf
568 except:
569 exc_content = self._wrap_exception('apply')
570 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
571 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
572 ident=self._topic('pyerr'))
573 reply_content = exc_content
574 result_buf = []
575
576 if exc_content['ename'] == 'UnmetDependency':
577 sub['dependencies_met'] = False
578 else:
579 reply_content = {'status' : 'ok'}
580
581 # put 'ok'/'error' status in header, for scheduler introspection:
582 sub['status'] = reply_content['status']
583
584 # flush i/o
585 sys.stdout.flush()
586 sys.stderr.flush()
587
588 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
589 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
590
591 #---------------------------------------------------------------------------
592 # Control messages
593 #---------------------------------------------------------------------------
594
595 def abort_request(self, stream, ident, parent):
596 """abort a specifig msg by id"""
597 msg_ids = parent['content'].get('msg_ids', None)
598 if isinstance(msg_ids, basestring):
599 msg_ids = [msg_ids]
600 if not msg_ids:
601 self.abort_queues()
602 for mid in msg_ids:
603 self.aborted.add(str(mid))
604
605 content = dict(status='ok')
606 reply_msg = self.session.send(stream, 'abort_reply', content=content,
607 parent=parent, ident=ident)
608 self.log.debug("%s", reply_msg)
609
610 def clear_request(self, stream, idents, parent):
611 """Clear our namespace."""
612 self.shell.reset(False)
613 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
614 content = dict(status='ok'))
615
437 616
438 617 #---------------------------------------------------------------------------
439 618 # Protected interface
440 619 #---------------------------------------------------------------------------
441 620
442 def _abort_queue(self):
621
622 def _wrap_exception(self, method=None):
623 # import here, because _wrap_exception is only used in parallel,
624 # and parallel has higher min pyzmq version
625 from IPython.parallel.error import wrap_exception
626 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
627 content = wrap_exception(e_info)
628 return content
629
630 def _topic(self, topic):
631 """prefixed topic for IOPub messages"""
632 if self.int_id >= 0:
633 base = "engine.%i" % self.int_id
634 else:
635 base = "kernel.%s" % self.ident
636
637 return py3compat.cast_bytes("%s.%s" % (base, topic))
638
639 def _abort_queues(self):
640 for stream in self.shell_streams:
641 if stream:
642 self._abort_queue(stream)
643
644 def _abort_queue(self, stream):
645 poller = zmq.Poller()
646 poller.register(stream.socket, zmq.POLLIN)
443 647 while True:
444 try:
445 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
446 except Exception:
447 self.log.warn("Invalid Message:", exc_info=True)
448 continue
648 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
449 649 if msg is None:
450 break
451 else:
452 assert ident is not None, \
453 "Unexpected missing message part."
650 return
454 651
455 self.log.debug("Aborting:\n"+str(Message(msg)))
652 self.log.info("Aborting:")
653 self.log.info("%s", msg)
456 654 msg_type = msg['header']['msg_type']
457 655 reply_type = msg_type.split('_')[0] + '_reply'
458 reply_msg = self.session.send(self.shell_socket, reply_type,
459 {'status' : 'aborted'}, msg, ident=ident)
460 self.log.debug(reply_msg)
656
657 status = {'status' : 'aborted'}
658 sub = {'engine' : self.ident}
659 sub.update(status)
660 reply_msg = self.session.send(stream, reply_type, subheader=sub,
661 content=status, parent=msg, ident=idents)
662 self.log.debug("%s", reply_msg)
461 663 # We need to wait a bit for requests to come in. This can probably
462 664 # be set shorter for true asynchronous clients.
463 time.sleep(0.1)
665 poller.poll(50)
666
464 667
465 668 def _no_raw_input(self):
466 669 """Raise StdinNotImplentedError if active frontend doesn't support
@@ -490,7 +693,7 b' class Kernel(Configurable):'
490 693 value = reply['content']['value']
491 694 except:
492 695 self.log.error("Got bad raw_input reply: ")
493 self.log.error(str(Message(parent)))
696 self.log.error("%s", parent)
494 697 value = ''
495 698 if value == '\x04':
496 699 # EOF
@@ -545,12 +748,9 b' class Kernel(Configurable):'
545 748 """
546 749 # io.rprint("Kernel at_shutdown") # dbg
547 750 if self._shutdown_message is not None:
548 self.session.send(self.shell_socket, self._shutdown_message)
549 self.session.send(self.iopub_socket, self._shutdown_message)
550 self.log.debug(str(self._shutdown_message))
551 # A very short sleep to give zmq time to flush its message buffers
552 # before Python truly shuts down.
553 time.sleep(0.01)
751 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
752 self.log.debug("%s", self._shutdown_message)
753 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
554 754
555 755 #-----------------------------------------------------------------------------
556 756 # Aliases and Flags for the IPKernelApp
@@ -603,9 +803,11 b' class IPKernelApp(KernelApp, InteractiveShellApp):'
603 803 self.init_code()
604 804
605 805 def init_kernel(self):
806
807 shell_stream = ZMQStream(self.shell_socket)
606 808
607 809 kernel = Kernel(config=self.config, session=self.session,
608 shell_socket=self.shell_socket,
810 shell_streams=[shell_stream],
609 811 iopub_socket=self.iopub_socket,
610 812 stdin_socket=self.stdin_socket,
611 813 log=self.log,
@@ -687,6 +889,13 b' def embed_kernel(module=None, local_ns=None, **kwargs):'
687 889 else:
688 890 app = IPKernelApp.instance(**kwargs)
689 891 app.initialize([])
892 # Undo unnecessary sys module mangling from init_sys_modules.
893 # This would not be necessary if we could prevent it
894 # in the first place by using a different InteractiveShell
895 # subclass, as in the regular embed case.
896 main = app.kernel.shell._orig_sys_modules_main_mod
897 if main is not None:
898 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
690 899
691 900 # load the calling scope if not given
692 901 (caller_module, caller_locals) = extract_module_locals(1)
@@ -15,15 +15,18 b' Authors'
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 # Standard library imports.
18 # Standard library imports
19 import atexit
19 20 import json
20 21 import os
21 22 import sys
23 import signal
22 24
23 # System library imports.
25 # System library imports
24 26 import zmq
27 from zmq.eventloop import ioloop
25 28
26 # IPython imports.
29 # IPython imports
27 30 from IPython.core.ultratb import FormattedTB
28 31 from IPython.core.application import (
29 32 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
@@ -82,17 +85,18 b' kernel_flags.update(session_flags)'
82 85 #-----------------------------------------------------------------------------
83 86
84 87 class KernelApp(BaseIPythonApplication):
85 name='pykernel'
88 name='ipkernel'
86 89 aliases = Dict(kernel_aliases)
87 90 flags = Dict(kernel_flags)
88 91 classes = [Session]
89 92 # the kernel class, as an importstring
90 kernel_class = DottedObjectName('IPython.zmq.pykernel.Kernel')
93 kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel')
91 94 kernel = Any()
92 95 poller = Any() # don't restrict this even though current pollers are all Threads
93 96 heartbeat = Instance(Heartbeat)
94 97 session = Instance('IPython.zmq.session.Session')
95 98 ports = Dict()
99 _full_connection_file = Unicode()
96 100
97 101 # inherit config file name from parent:
98 102 parent_appname = Unicode(config=True)
@@ -163,6 +167,8 b' class KernelApp(BaseIPythonApplication):'
163 167 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
164 168 except IOError:
165 169 self.log.debug("Connection file not found: %s", self.connection_file)
170 # This means I own it, so I will clean it up:
171 atexit.register(self.cleanup_connection_file)
166 172 return
167 173 self.log.debug(u"Loading connection file %s", fname)
168 174 with open(fname) as f:
@@ -188,6 +194,16 b' class KernelApp(BaseIPythonApplication):'
188 194 write_connection_file(cf, ip=self.ip, key=self.session.key,
189 195 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
190 196 iopub_port=self.iopub_port)
197
198 self._full_connection_file = cf
199
200 def cleanup_connection_file(self):
201 cf = self._full_connection_file
202 self.log.debug("cleaning up connection file: %r", cf)
203 try:
204 os.remove(cf)
205 except (IOError, OSError):
206 pass
191 207
192 208 def init_connection_file(self):
193 209 if not self.connection_file:
@@ -216,18 +232,23 b' class KernelApp(BaseIPythonApplication):'
216 232 self.stdin_socket = context.socket(zmq.ROUTER)
217 233 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
218 234 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
219
235
236 def init_heartbeat(self):
237 """start the heart beating"""
220 238 # heartbeat doesn't share context, because it mustn't be blocked
221 239 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
222 240 hb_ctx = zmq.Context()
223 241 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
224 242 self.hb_port = self.heartbeat.port
225 243 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
244 self.heartbeat.start()
226 245
227 246 # Helper to make it easier to connect to an existing kernel.
228 247 # set log-level to critical, to make sure it is output
229 248 self.log.critical("To connect another client to this kernel, use:")
230
249
250 def log_connection_info(self):
251 """display connection info, and store ports"""
231 252 basename = os.path.basename(self.connection_file)
232 253 if basename == self.connection_file or \
233 254 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
@@ -267,6 +288,9 b' class KernelApp(BaseIPythonApplication):'
267 288 displayhook_factory = import_item(str(self.displayhook_class))
268 289 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
269 290
291 def init_signal(self):
292 signal.signal(signal.SIGINT, signal.SIG_IGN)
293
270 294 def init_kernel(self):
271 295 """Create the Kernel object itself"""
272 296 kernel_factory = import_item(str(self.kernel_class))
@@ -286,9 +310,12 b' class KernelApp(BaseIPythonApplication):'
286 310 self.init_session()
287 311 self.init_poller()
288 312 self.init_sockets()
289 # writing connection file must be *after* init_sockets
313 self.init_heartbeat()
314 # writing/displaying connection info must be *after* init_sockets/heartbeat
315 self.log_connection_info()
290 316 self.write_connection_file()
291 317 self.init_io()
318 self.init_signal()
292 319 self.init_kernel()
293 320 # flush stdout/stderr, so that anything written to these streams during
294 321 # initialization do not get associated with the first execution request
@@ -296,11 +323,11 b' class KernelApp(BaseIPythonApplication):'
296 323 sys.stderr.flush()
297 324
298 325 def start(self):
299 self.heartbeat.start()
300 326 if self.poller is not None:
301 327 self.poller.start()
328 self.kernel.start()
302 329 try:
303 self.kernel.start()
330 ioloop.IOLoop.instance().start()
304 331 except KeyboardInterrupt:
305 332 pass
306 333
@@ -781,9 +781,6 b' class KernelManager(HasTraits):'
781 781
782 782 Parameters:
783 783 -----------
784 ipython : bool, optional (default True)
785 Whether to use an IPython kernel instead of a plain Python kernel.
786
787 784 launcher : callable, optional (default None)
788 785 A custom function for launching the kernel process (generally a
789 786 wrapper around ``entry_point.base_launch_kernel``). In most cases,
@@ -805,10 +802,7 b' class KernelManager(HasTraits):'
805 802 self._launch_args = kw.copy()
806 803 launch_kernel = kw.pop('launcher', None)
807 804 if launch_kernel is None:
808 if kw.pop('ipython', True):
809 from ipkernel import launch_kernel
810 else:
811 from pykernel import launch_kernel
805 from ipkernel import launch_kernel
812 806 self.kernel = launch_kernel(fname=self.connection_file, **kw)
813 807
814 808 def shutdown_kernel(self, restart=False):
@@ -629,7 +629,7 b' class Session(Configurable):'
629 629 if isinstance(socket, ZMQStream):
630 630 socket = socket.socket
631 631 try:
632 msg_list = socket.recv_multipart(mode)
632 msg_list = socket.recv_multipart(mode, copy=copy)
633 633 except zmq.ZMQError as e:
634 634 if e.errno == zmq.EAGAIN:
635 635 # We can convert EAGAIN to None as we know in this case
@@ -23,7 +23,7 b' from subprocess import Popen, PIPE'
23 23 import nose.tools as nt
24 24
25 25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
26 from IPython.utils import path
26 from IPython.utils import path, py3compat
27 27
28 28
29 29 #-------------------------------------------------------------------------------
@@ -68,17 +68,19 b' def setup_kernel(cmd):'
68 68 )
69 69 # wait for connection file to exist, timeout after 5s
70 70 tic = time.time()
71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5:
71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
72 72 time.sleep(0.1)
73 73
74 if kernel.poll() is not None:
75 o,e = kernel.communicate()
76 e = py3compat.cast_unicode(e)
77 raise IOError("Kernel failed to start:\n%s" % e)
78
74 79 if not os.path.exists(connection_file):
75 80 if kernel.poll() is None:
76 81 kernel.terminate()
77 82 raise IOError("Connection file %r never arrived" % connection_file)
78 83
79 if kernel.poll() is not None:
80 raise IOError("Kernel failed to start")
81
82 84 km = BlockingKernelManager(connection_file=connection_file)
83 85 km.load_connection_file()
84 86 km.start_channels()
@@ -157,3 +159,33 b' def test_embed_kernel_namespace():'
157 159 content = msg['content']
158 160 nt.assert_false(content['found'])
159 161
162 def test_embed_kernel_reentrant():
163 """IPython.embed_kernel() can be called multiple times"""
164 cmd = '\n'.join([
165 'from IPython import embed_kernel',
166 'count = 0',
167 'def go():',
168 ' global count',
169 ' embed_kernel()',
170 ' count = count + 1',
171 '',
172 'while True:'
173 ' go()',
174 '',
175 ])
176
177 with setup_kernel(cmd) as km:
178 shell = km.shell_channel
179 for i in range(5):
180 msg_id = shell.object_info('count')
181 msg = shell.get_msg(block=True, timeout=2)
182 content = msg['content']
183 nt.assert_true(content['found'])
184 nt.assert_equals(content['string_form'], unicode(i))
185
186 # exit from embed_kernel
187 shell.execute("get_ipython().exit_now = True")
188 msg = shell.get_msg(block=True, timeout=2)
189 time.sleep(0.2)
190
191
@@ -19,8 +19,12 b' from __future__ import print_function'
19 19 import inspect
20 20 import os
21 21 import sys
22 import time
22 23 from subprocess import Popen, PIPE
23 24
25 # System library imports
26 from zmq.eventloop import ioloop
27
24 28 # Our own
25 29 from IPython.core.interactiveshell import (
26 30 InteractiveShell, InteractiveShellABC
@@ -39,7 +43,7 b' from IPython.utils import io'
39 43 from IPython.utils.jsonutil import json_clean
40 44 from IPython.utils.path import get_py_filename
41 45 from IPython.utils.process import arg_split
42 from IPython.utils.traitlets import Instance, Type, Dict, CBool
46 from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes
43 47 from IPython.utils.warn import warn, error
44 48 from IPython.zmq.displayhook import ZMQShellDisplayHook, _encode_binary
45 49 from IPython.zmq.session import extract_header
@@ -56,6 +60,7 b' class ZMQDisplayPublisher(DisplayPublisher):'
56 60 session = Instance(Session)
57 61 pub_socket = Instance('zmq.Socket')
58 62 parent_header = Dict({})
63 topic = CBytes(b'displaypub')
59 64
60 65 def set_parent(self, parent):
61 66 """Set the parent for outbound messages."""
@@ -78,7 +83,7 b' class ZMQDisplayPublisher(DisplayPublisher):'
78 83 content['metadata'] = metadata
79 84 self.session.send(
80 85 self.pub_socket, u'display_data', json_clean(content),
81 parent=self.parent_header
86 parent=self.parent_header, ident=self.topic,
82 87 )
83 88
84 89 def clear_output(self, stdout=True, stderr=True, other=True):
@@ -93,7 +98,7 b' class ZMQDisplayPublisher(DisplayPublisher):'
93 98
94 99 self.session.send(
95 100 self.pub_socket, u'clear_output', content,
96 parent=self.parent_header
101 parent=self.parent_header, ident=self.topic,
97 102 )
98 103
99 104 class ZMQInteractiveShell(InteractiveShell):
@@ -114,6 +119,12 b' class ZMQInteractiveShell(InteractiveShell):'
114 119 exiter = Instance(ZMQExitAutocall)
115 120 def _exiter_default(self):
116 121 return ZMQExitAutocall(self)
122
123 def _exit_now_changed(self, name, old, new):
124 """stop eventloop when exit_now fires"""
125 if new:
126 loop = ioloop.IOLoop.instance()
127 loop.add_timeout(time.time()+0.1, loop.stop)
117 128
118 129 keepkernel_on_exit = None
119 130
@@ -154,6 +165,7 b' class ZMQInteractiveShell(InteractiveShell):'
154 165
155 166 def ask_exit(self):
156 167 """Engage the exit actions."""
168 self.exit_now = True
157 169 payload = dict(
158 170 source='IPython.zmq.zmqshell.ZMQInteractiveShell.ask_exit',
159 171 exit=True,
@@ -172,7 +184,11 b' class ZMQInteractiveShell(InteractiveShell):'
172 184 dh = self.displayhook
173 185 # Send exception info over pub socket for other clients than the caller
174 186 # to pick up
175 exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header)
187 topic = None
188 if dh.topic:
189 topic = dh.topic.replace(b'pyout', b'pyerr')
190
191 exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header, ident=topic)
176 192
177 193 # FIXME - Hack: store exception info in shell object. Right now, the
178 194 # caller is reading this info after the fact, we need to fix this logic
@@ -34,7 +34,7 b' TaskRecord keys:'
34 34 =============== =============== =============
35 35 Key Type Description
36 36 =============== =============== =============
37 msg_id uuid(bytes) The msg ID
37 msg_id uuid(ascii) The msg ID
38 38 header dict The request header
39 39 content dict The request content (likely empty)
40 40 buffers list(bytes) buffers containing serialized request objects
@@ -43,7 +43,7 b" client_uuid uuid(bytes) IDENT of client's socket"
43 43 engine_uuid uuid(bytes) IDENT of engine's socket
44 44 started datetime time task began execution on engine
45 45 completed datetime time task finished execution (success or failure) on engine
46 resubmitted datetime time of resubmission (if applicable)
46 resubmitted uuid(ascii) msg_id of resubmitted task (if applicable)
47 47 result_header dict header for result
48 48 result_content dict content for result
49 49 result_buffers list(bytes) buffers containing serialized request objects
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now