Show More
@@ -0,0 +1,179 b'' | |||
|
1 | """serialization utilities for apply messages | |
|
2 | ||
|
3 | Authors: | |
|
4 | ||
|
5 | * Min RK | |
|
6 | """ | |
|
7 | #----------------------------------------------------------------------------- | |
|
8 | # Copyright (C) 2010-2011 The IPython Development Team | |
|
9 | # | |
|
10 | # Distributed under the terms of the BSD License. The full license is in | |
|
11 | # the file COPYING, distributed as part of this software. | |
|
12 | #----------------------------------------------------------------------------- | |
|
13 | ||
|
14 | #----------------------------------------------------------------------------- | |
|
15 | # Imports | |
|
16 | #----------------------------------------------------------------------------- | |
|
17 | ||
|
18 | # Standard library imports | |
|
19 | import logging | |
|
20 | import os | |
|
21 | import re | |
|
22 | import socket | |
|
23 | import sys | |
|
24 | ||
|
25 | try: | |
|
26 | import cPickle | |
|
27 | pickle = cPickle | |
|
28 | except: | |
|
29 | cPickle = None | |
|
30 | import pickle | |
|
31 | ||
|
32 | ||
|
33 | # IPython imports | |
|
34 | from IPython.utils import py3compat | |
|
35 | from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence | |
|
36 | from IPython.utils.newserialized import serialize, unserialize | |
|
37 | ||
|
38 | if py3compat.PY3: | |
|
39 | buffer = memoryview | |
|
40 | ||
|
41 | #----------------------------------------------------------------------------- | |
|
42 | # Serialization Functions | |
|
43 | #----------------------------------------------------------------------------- | |
|
44 | ||
|
45 | def serialize_object(obj, threshold=64e-6): | |
|
46 | """Serialize an object into a list of sendable buffers. | |
|
47 | ||
|
48 | Parameters | |
|
49 | ---------- | |
|
50 | ||
|
51 | obj : object | |
|
52 | The object to be serialized | |
|
53 | threshold : float | |
|
54 | The threshold for not double-pickling the content. | |
|
55 | ||
|
56 | ||
|
57 | Returns | |
|
58 | ------- | |
|
59 | ('pmd', [bufs]) : | |
|
60 | where pmd is the pickled metadata wrapper, | |
|
61 | bufs is a list of data buffers | |
|
62 | """ | |
|
63 | databuffers = [] | |
|
64 | if isinstance(obj, (list, tuple)): | |
|
65 | clist = canSequence(obj) | |
|
66 | slist = map(serialize, clist) | |
|
67 | for s in slist: | |
|
68 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |
|
69 | databuffers.append(s.getData()) | |
|
70 | s.data = None | |
|
71 | return pickle.dumps(slist,-1), databuffers | |
|
72 | elif isinstance(obj, dict): | |
|
73 | sobj = {} | |
|
74 | for k in sorted(obj.iterkeys()): | |
|
75 | s = serialize(can(obj[k])) | |
|
76 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |
|
77 | databuffers.append(s.getData()) | |
|
78 | s.data = None | |
|
79 | sobj[k] = s | |
|
80 | return pickle.dumps(sobj,-1),databuffers | |
|
81 | else: | |
|
82 | s = serialize(can(obj)) | |
|
83 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |
|
84 | databuffers.append(s.getData()) | |
|
85 | s.data = None | |
|
86 | return pickle.dumps(s,-1),databuffers | |
|
87 | ||
|
88 | ||
|
89 | def unserialize_object(bufs): | |
|
90 | """reconstruct an object serialized by serialize_object from data buffers.""" | |
|
91 | bufs = list(bufs) | |
|
92 | sobj = pickle.loads(bufs.pop(0)) | |
|
93 | if isinstance(sobj, (list, tuple)): | |
|
94 | for s in sobj: | |
|
95 | if s.data is None: | |
|
96 | s.data = bufs.pop(0) | |
|
97 | return uncanSequence(map(unserialize, sobj)), bufs | |
|
98 | elif isinstance(sobj, dict): | |
|
99 | newobj = {} | |
|
100 | for k in sorted(sobj.iterkeys()): | |
|
101 | s = sobj[k] | |
|
102 | if s.data is None: | |
|
103 | s.data = bufs.pop(0) | |
|
104 | newobj[k] = uncan(unserialize(s)) | |
|
105 | return newobj, bufs | |
|
106 | else: | |
|
107 | if sobj.data is None: | |
|
108 | sobj.data = bufs.pop(0) | |
|
109 | return uncan(unserialize(sobj)), bufs | |
|
110 | ||
|
111 | def pack_apply_message(f, args, kwargs, threshold=64e-6): | |
|
112 | """pack up a function, args, and kwargs to be sent over the wire | |
|
113 | as a series of buffers. Any object whose data is larger than `threshold` | |
|
114 | will not have their data copied (currently only numpy arrays support zero-copy)""" | |
|
115 | msg = [pickle.dumps(can(f),-1)] | |
|
116 | databuffers = [] # for large objects | |
|
117 | sargs, bufs = serialize_object(args,threshold) | |
|
118 | msg.append(sargs) | |
|
119 | databuffers.extend(bufs) | |
|
120 | skwargs, bufs = serialize_object(kwargs,threshold) | |
|
121 | msg.append(skwargs) | |
|
122 | databuffers.extend(bufs) | |
|
123 | msg.extend(databuffers) | |
|
124 | return msg | |
|
125 | ||
|
126 | def unpack_apply_message(bufs, g=None, copy=True): | |
|
127 | """unpack f,args,kwargs from buffers packed by pack_apply_message() | |
|
128 | Returns: original f,args,kwargs""" | |
|
129 | bufs = list(bufs) # allow us to pop | |
|
130 | assert len(bufs) >= 3, "not enough buffers!" | |
|
131 | if not copy: | |
|
132 | for i in range(3): | |
|
133 | bufs[i] = bufs[i].bytes | |
|
134 | cf = pickle.loads(bufs.pop(0)) | |
|
135 | sargs = list(pickle.loads(bufs.pop(0))) | |
|
136 | skwargs = dict(pickle.loads(bufs.pop(0))) | |
|
137 | # print sargs, skwargs | |
|
138 | f = uncan(cf, g) | |
|
139 | for sa in sargs: | |
|
140 | if sa.data is None: | |
|
141 | m = bufs.pop(0) | |
|
142 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |
|
143 | # always use a buffer, until memoryviews get sorted out | |
|
144 | sa.data = buffer(m) | |
|
145 | # disable memoryview support | |
|
146 | # if copy: | |
|
147 | # sa.data = buffer(m) | |
|
148 | # else: | |
|
149 | # sa.data = m.buffer | |
|
150 | else: | |
|
151 | if copy: | |
|
152 | sa.data = m | |
|
153 | else: | |
|
154 | sa.data = m.bytes | |
|
155 | ||
|
156 | args = uncanSequence(map(unserialize, sargs), g) | |
|
157 | kwargs = {} | |
|
158 | for k in sorted(skwargs.iterkeys()): | |
|
159 | sa = skwargs[k] | |
|
160 | if sa.data is None: | |
|
161 | m = bufs.pop(0) | |
|
162 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |
|
163 | # always use a buffer, until memoryviews get sorted out | |
|
164 | sa.data = buffer(m) | |
|
165 | # disable memoryview support | |
|
166 | # if copy: | |
|
167 | # sa.data = buffer(m) | |
|
168 | # else: | |
|
169 | # sa.data = m.buffer | |
|
170 | else: | |
|
171 | if copy: | |
|
172 | sa.data = m | |
|
173 | else: | |
|
174 | sa.data = m.bytes | |
|
175 | ||
|
176 | kwargs[k] = uncan(unserialize(sa), g) | |
|
177 | ||
|
178 | return f,args,kwargs | |
|
179 |
@@ -133,6 +133,30 b' class Application(SingletonConfigurable):' | |||
|
133 | 133 | self.log_level = new |
|
134 | 134 | self.log.setLevel(new) |
|
135 | 135 | |
|
136 | log_format = Unicode("[%(name)s] %(message)s", config=True, | |
|
137 | help="The Logging format template", | |
|
138 | ) | |
|
139 | log = Instance(logging.Logger) | |
|
140 | def _log_default(self): | |
|
141 | """Start logging for this application. | |
|
142 | ||
|
143 | The default is to log to stdout using a StreaHandler. The log level | |
|
144 | starts at loggin.WARN, but this can be adjusted by setting the | |
|
145 | ``log_level`` attribute. | |
|
146 | """ | |
|
147 | log = logging.getLogger(self.__class__.__name__) | |
|
148 | log.setLevel(self.log_level) | |
|
149 | if sys.executable.endswith('pythonw.exe'): | |
|
150 | # this should really go to a file, but file-logging is only | |
|
151 | # hooked up in parallel applications | |
|
152 | _log_handler = logging.StreamHandler(open(os.devnull, 'w')) | |
|
153 | else: | |
|
154 | _log_handler = logging.StreamHandler() | |
|
155 | _log_formatter = logging.Formatter(self.log_format) | |
|
156 | _log_handler.setFormatter(_log_formatter) | |
|
157 | log.addHandler(_log_handler) | |
|
158 | return log | |
|
159 | ||
|
136 | 160 | # the alias map for configurables |
|
137 | 161 | aliases = Dict({'log-level' : 'Application.log_level'}) |
|
138 | 162 | |
@@ -169,32 +193,11 b' class Application(SingletonConfigurable):' | |||
|
169 | 193 | if self.__class__ not in self.classes: |
|
170 | 194 | self.classes.insert(0, self.__class__) |
|
171 | 195 | |
|
172 | self.init_logging() | |
|
173 | ||
|
174 | 196 | def _config_changed(self, name, old, new): |
|
175 | 197 | SingletonConfigurable._config_changed(self, name, old, new) |
|
176 | 198 | self.log.debug('Config changed:') |
|
177 | 199 | self.log.debug(repr(new)) |
|
178 | 200 | |
|
179 | def init_logging(self): | |
|
180 | """Start logging for this application. | |
|
181 | ||
|
182 | The default is to log to stdout using a StreaHandler. The log level | |
|
183 | starts at loggin.WARN, but this can be adjusted by setting the | |
|
184 | ``log_level`` attribute. | |
|
185 | """ | |
|
186 | self.log = logging.getLogger(self.__class__.__name__) | |
|
187 | self.log.setLevel(self.log_level) | |
|
188 | if sys.executable.endswith('pythonw.exe'): | |
|
189 | # this should really go to a file, but file-logging is only | |
|
190 | # hooked up in parallel applications | |
|
191 | self._log_handler = logging.StreamHandler(open(os.devnull, 'w')) | |
|
192 | else: | |
|
193 | self._log_handler = logging.StreamHandler() | |
|
194 | self._log_formatter = logging.Formatter("[%(name)s] %(message)s") | |
|
195 | self._log_handler.setFormatter(self._log_formatter) | |
|
196 | self.log.addHandler(self._log_handler) | |
|
197 | ||
|
198 | 201 | @catch_config_error |
|
199 | 202 | def initialize(self, argv=None): |
|
200 | 203 | """Do the basic steps to configure me. |
@@ -403,6 +403,7 b' class HistoryManager(HistoryAccessor):' | |||
|
403 | 403 | self.save_flag = threading.Event() |
|
404 | 404 | self.db_input_cache_lock = threading.Lock() |
|
405 | 405 | self.db_output_cache_lock = threading.Lock() |
|
406 | if self.hist_file != ':memory:': | |
|
406 | 407 | self.save_thread = HistorySavingThread(self) |
|
407 | 408 | self.save_thread.start() |
|
408 | 409 |
@@ -712,6 +712,7 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||
|
712 | 712 | self._orig_sys_module_state['stderr'] = sys.stderr |
|
713 | 713 | self._orig_sys_module_state['excepthook'] = sys.excepthook |
|
714 | 714 | self._orig_sys_modules_main_name = self.user_module.__name__ |
|
715 | self._orig_sys_modules_main_mod = sys.modules.get(self.user_module.__name__) | |
|
715 | 716 | |
|
716 | 717 | def restore_sys_module_state(self): |
|
717 | 718 | """Restore the state of the sys module.""" |
@@ -721,7 +722,8 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||
|
721 | 722 | except AttributeError: |
|
722 | 723 | pass |
|
723 | 724 | # Reset what what done in self.init_sys_modules |
|
724 | sys.modules[self.user_module.__name__] = self._orig_sys_modules_main_name | |
|
725 | if self._orig_sys_modules_main_mod is not None: | |
|
726 | sys.modules[self._orig_sys_modules_main_name] = self._orig_sys_modules_main_mod | |
|
725 | 727 | |
|
726 | 728 | #------------------------------------------------------------------------- |
|
727 | 729 | # Things related to hooks |
@@ -2424,7 +2426,7 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||
|
2424 | 2426 | self.showtraceback() |
|
2425 | 2427 | warn('Unknown failure executing module: <%s>' % mod_name) |
|
2426 | 2428 | |
|
2427 | def run_cell(self, raw_cell, store_history=False): | |
|
2429 | def run_cell(self, raw_cell, store_history=False, silent=False): | |
|
2428 | 2430 | """Run a complete IPython cell. |
|
2429 | 2431 | |
|
2430 | 2432 | Parameters |
@@ -2435,10 +2437,16 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||
|
2435 | 2437 | If True, the raw and translated cell will be stored in IPython's |
|
2436 | 2438 | history. For user code calling back into IPython's machinery, this |
|
2437 | 2439 | should be set to False. |
|
2440 | silent : bool | |
|
2441 | If True, avoid side-effets, such as implicit displayhooks, history, | |
|
2442 | and logging. silent=True forces store_history=False. | |
|
2438 | 2443 | """ |
|
2439 | 2444 | if (not raw_cell) or raw_cell.isspace(): |
|
2440 | 2445 | return |
|
2441 | 2446 | |
|
2447 | if silent: | |
|
2448 | store_history = False | |
|
2449 | ||
|
2442 | 2450 | for line in raw_cell.splitlines(): |
|
2443 | 2451 | self.input_splitter.push(line) |
|
2444 | 2452 | cell = self.input_splitter.source_reset() |
@@ -2462,7 +2470,7 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||
|
2462 | 2470 | if store_history: |
|
2463 | 2471 | self.history_manager.store_inputs(self.execution_count, |
|
2464 | 2472 | cell, raw_cell) |
|
2465 | ||
|
2473 | if not silent: | |
|
2466 | 2474 | self.logger.log(cell, raw_cell) |
|
2467 | 2475 | |
|
2468 | 2476 | if not prefilter_failed: |
@@ -2484,11 +2492,15 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||
|
2484 | 2492 | self.execution_count += 1 |
|
2485 | 2493 | return None |
|
2486 | 2494 | |
|
2495 | interactivity = "none" if silent else "last_expr" | |
|
2487 | 2496 | self.run_ast_nodes(code_ast.body, cell_name, |
|
2488 |
interactivity= |
|
|
2497 | interactivity=interactivity) | |
|
2489 | 2498 | |
|
2490 | 2499 | # Execute any registered post-execution functions. |
|
2491 | for func, status in self._post_execute.iteritems(): | |
|
2500 | # unless we are silent | |
|
2501 | post_exec = [] if silent else self._post_execute.iteritems() | |
|
2502 | ||
|
2503 | for func, status in post_exec: | |
|
2492 | 2504 | if self.disable_failing_post_execute and not status: |
|
2493 | 2505 | continue |
|
2494 | 2506 | try: |
@@ -255,6 +255,61 b' class InteractiveShellTestCase(unittest.TestCase):' | |||
|
255 | 255 | # ZeroDivisionError |
|
256 | 256 | self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}") |
|
257 | 257 | |
|
258 | def test_silent_nopostexec(self): | |
|
259 | """run_cell(silent=True) doesn't invoke post-exec funcs""" | |
|
260 | ip = get_ipython() | |
|
261 | ||
|
262 | d = dict(called=False) | |
|
263 | def set_called(): | |
|
264 | d['called'] = True | |
|
265 | ||
|
266 | ip.register_post_execute(set_called) | |
|
267 | ip.run_cell("1", silent=True) | |
|
268 | self.assertFalse(d['called']) | |
|
269 | # double-check that non-silent exec did what we expected | |
|
270 | # silent to avoid | |
|
271 | ip.run_cell("1") | |
|
272 | self.assertTrue(d['called']) | |
|
273 | # remove post-exec | |
|
274 | ip._post_execute.pop(set_called) | |
|
275 | ||
|
276 | def test_silent_noadvance(self): | |
|
277 | """run_cell(silent=True) doesn't advance execution_count""" | |
|
278 | ip = get_ipython() | |
|
279 | ||
|
280 | ec = ip.execution_count | |
|
281 | # silent should force store_history=False | |
|
282 | ip.run_cell("1", store_history=True, silent=True) | |
|
283 | ||
|
284 | self.assertEquals(ec, ip.execution_count) | |
|
285 | # double-check that non-silent exec did what we expected | |
|
286 | # silent to avoid | |
|
287 | ip.run_cell("1", store_history=True) | |
|
288 | self.assertEquals(ec+1, ip.execution_count) | |
|
289 | ||
|
290 | def test_silent_nodisplayhook(self): | |
|
291 | """run_cell(silent=True) doesn't trigger displayhook""" | |
|
292 | ip = get_ipython() | |
|
293 | ||
|
294 | d = dict(called=False) | |
|
295 | ||
|
296 | trap = ip.display_trap | |
|
297 | save_hook = trap.hook | |
|
298 | ||
|
299 | def failing_hook(*args, **kwargs): | |
|
300 | d['called'] = True | |
|
301 | ||
|
302 | try: | |
|
303 | trap.hook = failing_hook | |
|
304 | ip.run_cell("1", silent=True) | |
|
305 | self.assertFalse(d['called']) | |
|
306 | # double-check that non-silent exec did what we expected | |
|
307 | # silent to avoid | |
|
308 | ip.run_cell("1") | |
|
309 | self.assertTrue(d['called']) | |
|
310 | finally: | |
|
311 | trap.hook = save_hook | |
|
312 | ||
|
258 | 313 | @skipif(sys.version_info[0] >= 3, "softspace removed in py3") |
|
259 | 314 | def test_print_softspace(self): |
|
260 | 315 | """Verify that softspace is handled correctly when executing multiple |
@@ -211,7 +211,7 b' class ParalleMagic(Plugin):' | |||
|
211 | 211 | print '[stdout:%i]'%eid, stdout |
|
212 | 212 | |
|
213 | 213 | |
|
214 |
def pxrun_cell(self, raw_cell, store_history= |
|
|
214 | def pxrun_cell(self, raw_cell, store_history=False, silent=False): | |
|
215 | 215 | """drop-in replacement for InteractiveShell.run_cell. |
|
216 | 216 | |
|
217 | 217 | This executes code remotely, instead of in the local namespace. |
@@ -258,7 +258,7 b' class ParalleMagic(Plugin):' | |||
|
258 | 258 | return False |
|
259 | 259 | else: |
|
260 | 260 | try: |
|
261 | result = self.active_view.execute(cell, block=False) | |
|
261 | result = self.active_view.execute(cell, silent=False, block=False) | |
|
262 | 262 | except: |
|
263 | 263 | ipself.showtraceback() |
|
264 | 264 | return True |
@@ -140,8 +140,6 b' class IPythonConsoleApp(Configurable):' | |||
|
140 | 140 | frontend_flags = Any(app_flags) |
|
141 | 141 | frontend_aliases = Any(app_aliases) |
|
142 | 142 | |
|
143 | pure = CBool(False, config=True, | |
|
144 | help="Use a pure Python kernel instead of an IPython kernel.") | |
|
145 | 143 | # create requested profiles by default, if they don't exist: |
|
146 | 144 | auto_create = CBool(True) |
|
147 | 145 | # connection info: |
@@ -330,9 +328,7 b' class IPythonConsoleApp(Configurable):' | |||
|
330 | 328 | ) |
|
331 | 329 | # start the kernel |
|
332 | 330 | if not self.existing: |
|
333 | kwargs = dict(ipython=not self.pure) | |
|
334 | kwargs['extra_arguments'] = self.kernel_argv | |
|
335 | self.kernel_manager.start_kernel(**kwargs) | |
|
331 | self.kernel_manager.start_kernel(extra_arguments=self.kernel_argv) | |
|
336 | 332 | elif self.sshserver: |
|
337 | 333 | # ssh, write new connection file |
|
338 | 334 | self.kernel_manager.write_connection_file() |
@@ -45,8 +45,6 b' class DummyIPClusterStart(IPClusterStart):' | |||
|
45 | 45 | |
|
46 | 46 | def init_signal(self): |
|
47 | 47 | pass |
|
48 | def init_logging(self): | |
|
49 | pass | |
|
50 | 48 | def reinit_logging(self): |
|
51 | 49 | pass |
|
52 | 50 |
@@ -30,6 +30,7 b' from zmq.utils import jsonapi' | |||
|
30 | 30 | from IPython.external.decorator import decorator |
|
31 | 31 | from IPython.zmq.session import Session |
|
32 | 32 | from IPython.lib.security import passwd_check |
|
33 | from IPython.utils.jsonutil import date_default | |
|
33 | 34 | |
|
34 | 35 | try: |
|
35 | 36 | from docutils.core import publish_string |
@@ -385,13 +386,13 b' class ZMQStreamHandler(websocket.WebSocketHandler):' | |||
|
385 | 386 | except KeyError: |
|
386 | 387 | pass |
|
387 | 388 | msg.pop('buffers') |
|
388 | return jsonapi.dumps(msg) | |
|
389 | return jsonapi.dumps(msg, default=date_default) | |
|
389 | 390 | |
|
390 | 391 | def _on_zmq_reply(self, msg_list): |
|
391 | 392 | try: |
|
392 | 393 | msg = self._reserialize_reply(msg_list) |
|
393 | except: | |
|
394 | self.application.log.critical("Malformed message: %r" % msg_list) | |
|
394 | except Exception: | |
|
395 | self.application.log.critical("Malformed message: %r" % msg_list, exc_info=True) | |
|
395 | 396 | else: |
|
396 | 397 | self.write_message(msg) |
|
397 | 398 |
@@ -397,7 +397,6 b' class NotebookApp(BaseIPythonApplication):' | |||
|
397 | 397 | self.cluster_manager.update_profiles() |
|
398 | 398 | |
|
399 | 399 | def init_logging(self): |
|
400 | super(NotebookApp, self).init_logging() | |
|
401 | 400 | # This prevents double log messages because tornado use a root logger that |
|
402 | 401 | # self.log is a child of. The logging module dipatches log messages to a log |
|
403 | 402 | # and all of its ancenstors until propagate is set to False. |
@@ -500,6 +499,7 b' class NotebookApp(BaseIPythonApplication):' | |||
|
500 | 499 | |
|
501 | 500 | @catch_config_error |
|
502 | 501 | def initialize(self, argv=None): |
|
502 | self.init_logging() | |
|
503 | 503 | super(NotebookApp, self).initialize(argv) |
|
504 | 504 | self.init_configurables() |
|
505 | 505 | self.init_webapp() |
@@ -101,9 +101,7 b' ipython qtconsole --pylab=inline # start with pylab in inline plotting mode' | |||
|
101 | 101 | # start with copy of flags |
|
102 | 102 | flags = dict(flags) |
|
103 | 103 | qt_flags = { |
|
104 |
'p |
|
|
105 | "Use a pure Python kernel instead of an IPython kernel."), | |
|
106 | 'plain' : ({'ConsoleWidget' : {'kind' : 'plain'}}, | |
|
104 | 'plain' : ({'IPythonQtConsoleApp' : {'plain' : True}}, | |
|
107 | 105 | "Disable rich text support."), |
|
108 | 106 | } |
|
109 | 107 | qt_flags.update(boolean_flag( |
@@ -180,18 +178,14 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):' | |||
|
180 | 178 | plain = CBool(False, config=True, |
|
181 | 179 | help="Use a plaintext widget instead of rich text (plain can't print/save).") |
|
182 | 180 | |
|
183 |
def _p |
|
|
184 |
kind = 'plain' if |
|
|
181 | def _plain_changed(self, name, old, new): | |
|
182 | kind = 'plain' if new else 'rich' | |
|
185 | 183 | self.config.ConsoleWidget.kind = kind |
|
186 |
if |
|
|
187 | self.widget_factory = FrontendWidget | |
|
188 | elif self.plain: | |
|
184 | if new: | |
|
189 | 185 | self.widget_factory = IPythonWidget |
|
190 | 186 | else: |
|
191 | 187 | self.widget_factory = RichIPythonWidget |
|
192 | 188 | |
|
193 | _plain_changed = _pure_changed | |
|
194 | ||
|
195 | 189 | # the factory for creating a widget |
|
196 | 190 | widget_factory = Any(RichIPythonWidget) |
|
197 | 191 | |
@@ -210,7 +204,7 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):' | |||
|
210 | 204 | config=self.config, |
|
211 | 205 | ) |
|
212 | 206 | # start the kernel |
|
213 |
kwargs = dict( |
|
|
207 | kwargs = dict() | |
|
214 | 208 | kwargs['extra_arguments'] = self.kernel_argv |
|
215 | 209 | kernel_manager.start_kernel(**kwargs) |
|
216 | 210 | kernel_manager.start_channels() |
@@ -273,17 +267,13 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):' | |||
|
273 | 267 | self.window.add_tab_with_frontend(self.widget) |
|
274 | 268 | self.window.init_menu_bar() |
|
275 | 269 | |
|
276 |
self.window.setWindowTitle(' |
|
|
270 | self.window.setWindowTitle('IPython') | |
|
277 | 271 | |
|
278 | 272 | def init_colors(self, widget): |
|
279 | 273 | """Configure the coloring of the widget""" |
|
280 | 274 | # Note: This will be dramatically simplified when colors |
|
281 | 275 | # are removed from the backend. |
|
282 | 276 | |
|
283 | if self.pure: | |
|
284 | # only IPythonWidget supports styling | |
|
285 | return | |
|
286 | ||
|
287 | 277 | # parse the colors arg down to current known labels |
|
288 | 278 | try: |
|
289 | 279 | colors = self.config.ZMQInteractiveShell.colors |
@@ -46,11 +46,11 b' def get_connection_file(app=None):' | |||
|
46 | 46 | If unspecified, the currently running app will be used |
|
47 | 47 | """ |
|
48 | 48 | if app is None: |
|
49 |
from IPython.zmq.kernel |
|
|
50 | if not KernelApp.initialized(): | |
|
49 | from IPython.zmq.ipkernel import IPKernelApp | |
|
50 | if not IPKernelApp.initialized(): | |
|
51 | 51 | raise RuntimeError("app not specified, and not in a running Kernel") |
|
52 | 52 | |
|
53 | app = KernelApp.instance() | |
|
53 | app = IPKernelApp.instance() | |
|
54 | 54 | return filefind(app.connection_file, ['.', app.profile_dir.security_dir]) |
|
55 | 55 | |
|
56 | 56 | def find_connection_file(filename, profile=None): |
@@ -35,6 +35,28 b' from .client.asyncresult import *' | |||
|
35 | 35 | from .client.client import Client |
|
36 | 36 | from .client.remotefunction import * |
|
37 | 37 | from .client.view import * |
|
38 | from .util import interactive | |
|
38 | 39 | from .controller.dependency import * |
|
39 | 40 | |
|
41 | #----------------------------------------------------------------------------- | |
|
42 | # Functions | |
|
43 | #----------------------------------------------------------------------------- | |
|
44 | ||
|
45 | ||
|
46 | def bind_kernel(**kwargs): | |
|
47 | """Bind an Engine's Kernel to be used as a full IPython kernel. | |
|
48 | ||
|
49 | This allows a running Engine to be used simultaneously as a full IPython kernel | |
|
50 | with the QtConsole or other frontends. | |
|
51 | ||
|
52 | This function returns immediately. | |
|
53 | """ | |
|
54 | from IPython.parallel.apps.ipengineapp import IPEngineApp | |
|
55 | if IPEngineApp.initialized(): | |
|
56 | app = IPEngineApp.instance() | |
|
57 | else: | |
|
58 | raise RuntimeError("Must be called from an IPEngineApp instance") | |
|
59 | ||
|
60 | return app.bind_kernel(**kwargs) | |
|
61 | ||
|
40 | 62 |
@@ -103,6 +103,10 b' class BaseParallelApplication(BaseIPythonApplication):' | |||
|
103 | 103 | # temporarily override default_log_level to INFO |
|
104 | 104 | return logging.INFO |
|
105 | 105 | |
|
106 | def _log_format_default(self): | |
|
107 | """override default log format to include time""" | |
|
108 | return u"%(asctime)s.%(msecs).03d [%(name)s] %(message)s" | |
|
109 | ||
|
106 | 110 | work_dir = Unicode(os.getcwdu(), config=True, |
|
107 | 111 | help='Set the working dir for the process.' |
|
108 | 112 | ) |
@@ -175,11 +179,14 b' class BaseParallelApplication(BaseIPythonApplication):' | |||
|
175 | 179 | else: |
|
176 | 180 | open_log_file = None |
|
177 | 181 | if open_log_file is not None: |
|
178 |
self.log. |
|
|
182 | while self.log.handlers: | |
|
183 | self.log.removeHandler(self.log.handlers[0]) | |
|
179 | 184 | self._log_handler = logging.StreamHandler(open_log_file) |
|
180 | 185 | self.log.addHandler(self._log_handler) |
|
186 | else: | |
|
187 | self._log_handler = self.log.handlers[0] | |
|
181 | 188 | # Add timestamps to log format: |
|
182 |
self._log_formatter = logging.Formatter( |
|
|
189 | self._log_formatter = logging.Formatter(self.log_format, | |
|
183 | 190 | datefmt="%Y-%m-%d %H:%M:%S") |
|
184 | 191 | self._log_handler.setFormatter(self._log_formatter) |
|
185 | 192 | # do not propagate log messages to root logger |
@@ -434,11 +434,9 b' class IPControllerApp(BaseParallelApplication):' | |||
|
434 | 434 | lsock = context.socket(zmq.PUB) |
|
435 | 435 | lsock.connect(self.log_url) |
|
436 | 436 | handler = PUBHandler(lsock) |
|
437 | self.log.removeHandler(self._log_handler) | |
|
438 | 437 | handler.root_topic = 'controller' |
|
439 | 438 | handler.setLevel(self.log_level) |
|
440 | 439 | self.log.addHandler(handler) |
|
441 | self._log_handler = handler | |
|
442 | 440 | |
|
443 | 441 | @catch_config_error |
|
444 | 442 | def initialize(self, argv=None): |
@@ -37,6 +37,7 b' from IPython.parallel.apps.baseapp import (' | |||
|
37 | 37 | catch_config_error, |
|
38 | 38 | ) |
|
39 | 39 | from IPython.zmq.log import EnginePUBHandler |
|
40 | from IPython.zmq.ipkernel import Kernel, IPKernelApp | |
|
40 | 41 | from IPython.zmq.session import ( |
|
41 | 42 | Session, session_aliases, session_flags |
|
42 | 43 | ) |
@@ -44,11 +45,11 b' from IPython.zmq.session import (' | |||
|
44 | 45 | from IPython.config.configurable import Configurable |
|
45 | 46 | |
|
46 | 47 | from IPython.parallel.engine.engine import EngineFactory |
|
47 |
from IPython.parallel. |
|
|
48 | from IPython.parallel.util import disambiguate_url, asbytes | |
|
48 | from IPython.parallel.util import disambiguate_url | |
|
49 | 49 | |
|
50 | 50 | from IPython.utils.importstring import import_item |
|
51 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float | |
|
51 | from IPython.utils.py3compat import cast_bytes | |
|
52 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance | |
|
52 | 53 | |
|
53 | 54 | |
|
54 | 55 | #----------------------------------------------------------------------------- |
@@ -174,9 +175,17 b' class IPEngineApp(BaseParallelApplication):' | |||
|
174 | 175 | help="""The URL for the iploggerapp instance, for forwarding |
|
175 | 176 | logging to a central location.""") |
|
176 | 177 | |
|
178 | # an IPKernelApp instance, used to setup listening for shell frontends | |
|
179 | kernel_app = Instance(IPKernelApp) | |
|
180 | ||
|
177 | 181 | aliases = Dict(aliases) |
|
178 | 182 | flags = Dict(flags) |
|
179 | 183 | |
|
184 | @property | |
|
185 | def kernel(self): | |
|
186 | """allow access to the Kernel object, so I look like IPKernelApp""" | |
|
187 | return self.engine.kernel | |
|
188 | ||
|
180 | 189 | def find_url_file(self): |
|
181 | 190 | """Set the url file. |
|
182 | 191 | |
@@ -203,7 +212,7 b' class IPEngineApp(BaseParallelApplication):' | |||
|
203 | 212 | d = json.loads(f.read()) |
|
204 | 213 | |
|
205 | 214 | if 'exec_key' in d: |
|
206 | config.Session.key = asbytes(d['exec_key']) | |
|
215 | config.Session.key = cast_bytes(d['exec_key']) | |
|
207 | 216 | |
|
208 | 217 | try: |
|
209 | 218 | config.EngineFactory.location |
@@ -221,6 +230,46 b' class IPEngineApp(BaseParallelApplication):' | |||
|
221 | 230 | except AttributeError: |
|
222 | 231 | config.EngineFactory.sshserver = d['ssh'] |
|
223 | 232 |
|
|
233 | def bind_kernel(self, **kwargs): | |
|
234 | """Promote engine to listening kernel, accessible to frontends.""" | |
|
235 | if self.kernel_app is not None: | |
|
236 | return | |
|
237 | ||
|
238 | self.log.info("Opening ports for direct connections as an IPython kernel") | |
|
239 | ||
|
240 | kernel = self.kernel | |
|
241 | ||
|
242 | kwargs.setdefault('config', self.config) | |
|
243 | kwargs.setdefault('log', self.log) | |
|
244 | kwargs.setdefault('profile_dir', self.profile_dir) | |
|
245 | kwargs.setdefault('session', self.engine.session) | |
|
246 | ||
|
247 | app = self.kernel_app = IPKernelApp(**kwargs) | |
|
248 | ||
|
249 | # allow IPKernelApp.instance(): | |
|
250 | IPKernelApp._instance = app | |
|
251 | ||
|
252 | app.init_connection_file() | |
|
253 | # relevant contents of init_sockets: | |
|
254 | ||
|
255 | app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port) | |
|
256 | app.log.debug("shell ROUTER Channel on port: %i", app.shell_port) | |
|
257 | ||
|
258 | app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port) | |
|
259 | app.log.debug("iopub PUB Channel on port: %i", app.iopub_port) | |
|
260 | ||
|
261 | kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER) | |
|
262 | app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port) | |
|
263 | app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port) | |
|
264 | ||
|
265 | # start the heartbeat, and log connection info: | |
|
266 | ||
|
267 | app.init_heartbeat() | |
|
268 | ||
|
269 | app.log_connection_info() | |
|
270 | app.write_connection_file() | |
|
271 | ||
|
272 | ||
|
224 | 273 | def init_engine(self): |
|
225 | 274 | # This is the working dir by now. |
|
226 | 275 | sys.path.insert(0, '') |
@@ -282,11 +331,9 b' class IPEngineApp(BaseParallelApplication):' | |||
|
282 | 331 | context = self.engine.context |
|
283 | 332 | lsock = context.socket(zmq.PUB) |
|
284 | 333 | lsock.connect(self.log_url) |
|
285 | self.log.removeHandler(self._log_handler) | |
|
286 | 334 | handler = EnginePUBHandler(self.engine, lsock) |
|
287 | 335 | handler.setLevel(self.log_level) |
|
288 | 336 | self.log.addHandler(handler) |
|
289 | self._log_handler = handler | |
|
290 | 337 | |
|
291 | 338 | def init_mpi(self): |
|
292 | 339 | global mpi |
@@ -23,6 +23,7 b' import copy' | |||
|
23 | 23 | import logging |
|
24 | 24 | import os |
|
25 | 25 | import stat |
|
26 | import sys | |
|
26 | 27 | import time |
|
27 | 28 | |
|
28 | 29 | # signal imports, handling various platforms, versions |
@@ -59,8 +60,8 b' from IPython.utils.text import EvalFormatter' | |||
|
59 | 60 | from IPython.utils.traitlets import ( |
|
60 | 61 | Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp |
|
61 | 62 | ) |
|
62 |
from IPython.utils.path import |
|
|
63 |
from IPython.utils.process import find_cmd, |
|
|
63 | from IPython.utils.path import get_home_dir | |
|
64 | from IPython.utils.process import find_cmd, FindCmdError | |
|
64 | 65 | |
|
65 | 66 | from .win32support import forward_read_events |
|
66 | 67 | |
@@ -72,18 +73,13 b" WINDOWS = os.name == 'nt'" | |||
|
72 | 73 | # Paths to the kernel apps |
|
73 | 74 | #----------------------------------------------------------------------------- |
|
74 | 75 | |
|
76 | cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()" | |
|
75 | 77 | |
|
76 | ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path( | |
|
77 | 'IPython.parallel.apps.ipclusterapp' | |
|
78 | )) | |
|
78 | ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"] | |
|
79 | 79 | |
|
80 | ipengine_cmd_argv = pycmd2argv(get_ipython_module_path( | |
|
81 | 'IPython.parallel.apps.ipengineapp' | |
|
82 | )) | |
|
80 | ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"] | |
|
83 | 81 | |
|
84 | ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path( | |
|
85 | 'IPython.parallel.apps.ipcontrollerapp' | |
|
86 | )) | |
|
82 | ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"] | |
|
87 | 83 | |
|
88 | 84 | #----------------------------------------------------------------------------- |
|
89 | 85 | # Base launchers and errors |
@@ -36,6 +36,7 b' from IPython.core.application import BaseIPythonApplication' | |||
|
36 | 36 | from IPython.utils.jsonutil import rekey |
|
37 | 37 | from IPython.utils.localinterfaces import LOCAL_IPS |
|
38 | 38 | from IPython.utils.path import get_ipython_dir |
|
39 | from IPython.utils.py3compat import cast_bytes | |
|
39 | 40 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, |
|
40 | 41 | Dict, List, Bool, Set, Any) |
|
41 | 42 | from IPython.external.decorator import decorator |
@@ -71,6 +72,60 b' def spin_first(f, self, *args, **kwargs):' | |||
|
71 | 72 | # Classes |
|
72 | 73 | #-------------------------------------------------------------------------- |
|
73 | 74 | |
|
75 | ||
|
76 | class ExecuteReply(object): | |
|
77 | """wrapper for finished Execute results""" | |
|
78 | def __init__(self, msg_id, content, metadata): | |
|
79 | self.msg_id = msg_id | |
|
80 | self._content = content | |
|
81 | self.execution_count = content['execution_count'] | |
|
82 | self.metadata = metadata | |
|
83 | ||
|
84 | def __getitem__(self, key): | |
|
85 | return self.metadata[key] | |
|
86 | ||
|
87 | def __getattr__(self, key): | |
|
88 | if key not in self.metadata: | |
|
89 | raise AttributeError(key) | |
|
90 | return self.metadata[key] | |
|
91 | ||
|
92 | def __repr__(self): | |
|
93 | pyout = self.metadata['pyout'] or {} | |
|
94 | text_out = pyout.get('data', {}).get('text/plain', '') | |
|
95 | if len(text_out) > 32: | |
|
96 | text_out = text_out[:29] + '...' | |
|
97 | ||
|
98 | return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out) | |
|
99 | ||
|
100 | def _repr_html_(self): | |
|
101 | pyout = self.metadata['pyout'] or {'data':{}} | |
|
102 | return pyout['data'].get("text/html") | |
|
103 | ||
|
104 | def _repr_latex_(self): | |
|
105 | pyout = self.metadata['pyout'] or {'data':{}} | |
|
106 | return pyout['data'].get("text/latex") | |
|
107 | ||
|
108 | def _repr_json_(self): | |
|
109 | pyout = self.metadata['pyout'] or {'data':{}} | |
|
110 | return pyout['data'].get("application/json") | |
|
111 | ||
|
112 | def _repr_javascript_(self): | |
|
113 | pyout = self.metadata['pyout'] or {'data':{}} | |
|
114 | return pyout['data'].get("application/javascript") | |
|
115 | ||
|
116 | def _repr_png_(self): | |
|
117 | pyout = self.metadata['pyout'] or {'data':{}} | |
|
118 | return pyout['data'].get("image/png") | |
|
119 | ||
|
120 | def _repr_jpeg_(self): | |
|
121 | pyout = self.metadata['pyout'] or {'data':{}} | |
|
122 | return pyout['data'].get("image/jpeg") | |
|
123 | ||
|
124 | def _repr_svg_(self): | |
|
125 | pyout = self.metadata['pyout'] or {'data':{}} | |
|
126 | return pyout['data'].get("image/svg+xml") | |
|
127 | ||
|
128 | ||
|
74 | 129 | class Metadata(dict): |
|
75 | 130 | """Subclass of dict for initializing metadata values. |
|
76 | 131 | |
@@ -97,6 +152,7 b' class Metadata(dict):' | |||
|
97 | 152 | 'pyerr' : None, |
|
98 | 153 | 'stdout' : '', |
|
99 | 154 | 'stderr' : '', |
|
155 | 'outputs' : [], | |
|
100 | 156 | } |
|
101 | 157 | self.update(md) |
|
102 | 158 | self.update(dict(*args, **kwargs)) |
@@ -308,15 +364,19 b' class Client(HasTraits):' | |||
|
308 | 364 | if self._cd is not None: |
|
309 | 365 | if url_or_file is None: |
|
310 | 366 | url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') |
|
311 | assert url_or_file is not None, "I can't find enough information to connect to a hub!"\ | |
|
367 | if url_or_file is None: | |
|
368 | raise ValueError( | |
|
369 | "I can't find enough information to connect to a hub!" | |
|
312 | 370 | " Please specify at least one of url_or_file or profile." |
|
371 | ) | |
|
313 | 372 | |
|
314 | 373 | if not util.is_url(url_or_file): |
|
315 | 374 | # it's not a url, try for a file |
|
316 | 375 | if not os.path.exists(url_or_file): |
|
317 | 376 | if self._cd: |
|
318 | 377 | url_or_file = os.path.join(self._cd.security_dir, url_or_file) |
|
319 | assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file | |
|
378 | if not os.path.exists(url_or_file): | |
|
379 | raise IOError("Connection file not found: %r" % url_or_file) | |
|
320 | 380 | with open(url_or_file) as f: |
|
321 | 381 | cfg = json.loads(f.read()) |
|
322 | 382 | else: |
@@ -369,7 +429,7 b' class Client(HasTraits):' | |||
|
369 | 429 | if os.path.isfile(exec_key): |
|
370 | 430 | extra_args['keyfile'] = exec_key |
|
371 | 431 | else: |
|
372 |
exec_key = |
|
|
432 | exec_key = cast_bytes(exec_key) | |
|
373 | 433 | extra_args['key'] = exec_key |
|
374 | 434 | self.session = Session(**extra_args) |
|
375 | 435 | |
@@ -467,7 +527,7 b' class Client(HasTraits):' | |||
|
467 | 527 | if not isinstance(targets, (tuple, list, xrange)): |
|
468 | 528 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) |
|
469 | 529 | |
|
470 |
return [ |
|
|
530 | return [cast_bytes(self._engines[t]) for t in targets], list(targets) | |
|
471 | 531 | |
|
472 | 532 | def _connect(self, sshserver, ssh_kwargs, timeout): |
|
473 | 533 | """setup all our socket connections to the cluster. This is called from |
@@ -628,7 +688,30 b' class Client(HasTraits):' | |||
|
628 | 688 | print ("got unknown result: %s"%msg_id) |
|
629 | 689 | else: |
|
630 | 690 | self.outstanding.remove(msg_id) |
|
631 | self.results[msg_id] = self._unwrap_exception(msg['content']) | |
|
691 | ||
|
692 | content = msg['content'] | |
|
693 | header = msg['header'] | |
|
694 | ||
|
695 | # construct metadata: | |
|
696 | md = self.metadata[msg_id] | |
|
697 | md.update(self._extract_metadata(header, parent, content)) | |
|
698 | # is this redundant? | |
|
699 | self.metadata[msg_id] = md | |
|
700 | ||
|
701 | e_outstanding = self._outstanding_dict[md['engine_uuid']] | |
|
702 | if msg_id in e_outstanding: | |
|
703 | e_outstanding.remove(msg_id) | |
|
704 | ||
|
705 | # construct result: | |
|
706 | if content['status'] == 'ok': | |
|
707 | self.results[msg_id] = ExecuteReply(msg_id, content, md) | |
|
708 | elif content['status'] == 'aborted': | |
|
709 | self.results[msg_id] = error.TaskAborted(msg_id) | |
|
710 | elif content['status'] == 'resubmitted': | |
|
711 | # TODO: handle resubmission | |
|
712 | pass | |
|
713 | else: | |
|
714 | self.results[msg_id] = self._unwrap_exception(content) | |
|
632 | 715 | |
|
633 | 716 | def _handle_apply_reply(self, msg): |
|
634 | 717 | """Save the reply to an apply_request into our results.""" |
@@ -750,8 +833,13 b' class Client(HasTraits):' | |||
|
750 | 833 | md.update({'pyerr' : self._unwrap_exception(content)}) |
|
751 | 834 | elif msg_type == 'pyin': |
|
752 | 835 | md.update({'pyin' : content['code']}) |
|
836 | elif msg_type == 'display_data': | |
|
837 | md['outputs'].append(content) | |
|
838 | elif msg_type == 'pyout': | |
|
839 | md['pyout'] = content | |
|
753 | 840 | else: |
|
754 | md.update({msg_type : content.get('data', '')}) | |
|
841 | # unhandled msg_type (status, etc.) | |
|
842 | pass | |
|
755 | 843 | |
|
756 | 844 | # reduntant? |
|
757 | 845 | self.metadata[msg_id] = md |
@@ -848,14 +936,14 b' class Client(HasTraits):' | |||
|
848 | 936 | """ |
|
849 | 937 | if self._notification_socket: |
|
850 | 938 | self._flush_notifications() |
|
939 | if self._iopub_socket: | |
|
940 | self._flush_iopub(self._iopub_socket) | |
|
851 | 941 | if self._mux_socket: |
|
852 | 942 | self._flush_results(self._mux_socket) |
|
853 | 943 | if self._task_socket: |
|
854 | 944 | self._flush_results(self._task_socket) |
|
855 | 945 | if self._control_socket: |
|
856 | 946 | self._flush_control(self._control_socket) |
|
857 | if self._iopub_socket: | |
|
858 | self._flush_iopub(self._iopub_socket) | |
|
859 | 947 | if self._query_socket: |
|
860 | 948 | self._flush_ignored_hub_replies() |
|
861 | 949 | |
@@ -1024,14 +1112,16 b' class Client(HasTraits):' | |||
|
1024 | 1112 | |
|
1025 | 1113 | return result |
|
1026 | 1114 | |
|
1027 |
def send_apply_ |
|
|
1115 | def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False, | |
|
1028 | 1116 | ident=None): |
|
1029 | 1117 | """construct and send an apply message via a socket. |
|
1030 | 1118 | |
|
1031 | 1119 | This is the principal method with which all engine execution is performed by views. |
|
1032 | 1120 | """ |
|
1033 | 1121 | |
|
1034 | assert not self._closed, "cannot use me anymore, I'm closed!" | |
|
1122 | if self._closed: | |
|
1123 | raise RuntimeError("Client cannot be used after its sockets have been closed") | |
|
1124 | ||
|
1035 | 1125 | # defaults: |
|
1036 | 1126 | args = args if args is not None else [] |
|
1037 | 1127 | kwargs = kwargs if kwargs is not None else {} |
@@ -1066,6 +1156,43 b' class Client(HasTraits):' | |||
|
1066 | 1156 | |
|
1067 | 1157 | return msg |
|
1068 | 1158 | |
|
1159 | def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None): | |
|
1160 | """construct and send an execute request via a socket. | |
|
1161 | ||
|
1162 | """ | |
|
1163 | ||
|
1164 | if self._closed: | |
|
1165 | raise RuntimeError("Client cannot be used after its sockets have been closed") | |
|
1166 | ||
|
1167 | # defaults: | |
|
1168 | subheader = subheader if subheader is not None else {} | |
|
1169 | ||
|
1170 | # validate arguments | |
|
1171 | if not isinstance(code, basestring): | |
|
1172 | raise TypeError("code must be text, not %s" % type(code)) | |
|
1173 | if not isinstance(subheader, dict): | |
|
1174 | raise TypeError("subheader must be dict, not %s" % type(subheader)) | |
|
1175 | ||
|
1176 | content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={}) | |
|
1177 | ||
|
1178 | ||
|
1179 | msg = self.session.send(socket, "execute_request", content=content, ident=ident, | |
|
1180 | subheader=subheader) | |
|
1181 | ||
|
1182 | msg_id = msg['header']['msg_id'] | |
|
1183 | self.outstanding.add(msg_id) | |
|
1184 | if ident: | |
|
1185 | # possibly routed to a specific engine | |
|
1186 | if isinstance(ident, list): | |
|
1187 | ident = ident[-1] | |
|
1188 | if ident in self._engines.values(): | |
|
1189 | # save for later, in case of engine death | |
|
1190 | self._outstanding_dict[ident].add(msg_id) | |
|
1191 | self.history.append(msg_id) | |
|
1192 | self.metadata[msg_id]['submitted'] = datetime.now() | |
|
1193 | ||
|
1194 | return msg | |
|
1195 | ||
|
1069 | 1196 | #-------------------------------------------------------------------------- |
|
1070 | 1197 | # construct a View object |
|
1071 | 1198 | #-------------------------------------------------------------------------- |
@@ -1221,12 +1348,6 b' class Client(HasTraits):' | |||
|
1221 | 1348 | raise TypeError("indices must be str or int, not %r"%id) |
|
1222 | 1349 | theids.append(id) |
|
1223 | 1350 | |
|
1224 | for msg_id in theids: | |
|
1225 | self.outstanding.discard(msg_id) | |
|
1226 | if msg_id in self.history: | |
|
1227 | self.history.remove(msg_id) | |
|
1228 | self.results.pop(msg_id, None) | |
|
1229 | self.metadata.pop(msg_id, None) | |
|
1230 | 1351 | content = dict(msg_ids = theids) |
|
1231 | 1352 | |
|
1232 | 1353 | self.session.send(self._query_socket, 'resubmit_request', content) |
@@ -1238,8 +1359,10 b' class Client(HasTraits):' | |||
|
1238 | 1359 | content = msg['content'] |
|
1239 | 1360 | if content['status'] != 'ok': |
|
1240 | 1361 | raise self._unwrap_exception(content) |
|
1362 | mapping = content['resubmitted'] | |
|
1363 | new_ids = [ mapping[msg_id] for msg_id in theids ] | |
|
1241 | 1364 | |
|
1242 |
ar = AsyncHubResult(self, msg_ids= |
|
|
1365 | ar = AsyncHubResult(self, msg_ids=new_ids) | |
|
1243 | 1366 | |
|
1244 | 1367 | if block: |
|
1245 | 1368 | ar.wait() |
@@ -128,13 +128,20 b' class View(HasTraits):' | |||
|
128 | 128 | |
|
129 | 129 | assert not self.__class__ is View, "Don't use base View objects, use subclasses" |
|
130 | 130 | |
|
131 | ||
|
132 | 131 | def __repr__(self): |
|
133 | 132 | strtargets = str(self.targets) |
|
134 | 133 | if len(strtargets) > 16: |
|
135 | 134 | strtargets = strtargets[:12]+'...]' |
|
136 | 135 | return "<%s %s>"%(self.__class__.__name__, strtargets) |
|
137 | 136 | |
|
137 | def __len__(self): | |
|
138 | if isinstance(self.targets, list): | |
|
139 | return len(self.targets) | |
|
140 | elif isinstance(self.targets, int): | |
|
141 | return 1 | |
|
142 | else: | |
|
143 | return len(self.client) | |
|
144 | ||
|
138 | 145 | def set_flags(self, **kwargs): |
|
139 | 146 | """set my attribute flags by keyword. |
|
140 | 147 | |
@@ -195,7 +202,7 b' class View(HasTraits):' | |||
|
195 | 202 | @sync_results |
|
196 | 203 | @save_ids |
|
197 | 204 | def _really_apply(self, f, args, kwargs, block=None, **options): |
|
198 |
"""wrapper for client.send_apply_ |
|
|
205 | """wrapper for client.send_apply_request""" | |
|
199 | 206 | raise NotImplementedError("Implement in subclasses") |
|
200 | 207 | |
|
201 | 208 | def apply(self, f, *args, **kwargs): |
@@ -533,7 +540,7 b' class DirectView(View):' | |||
|
533 | 540 | msg_ids = [] |
|
534 | 541 | trackers = [] |
|
535 | 542 | for ident in _idents: |
|
536 |
msg = self.client.send_apply_ |
|
|
543 | msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, | |
|
537 | 544 | ident=ident) |
|
538 | 545 | if track: |
|
539 | 546 | trackers.append(msg['tracker']) |
@@ -547,6 +554,7 b' class DirectView(View):' | |||
|
547 | 554 | pass |
|
548 | 555 | return ar |
|
549 | 556 | |
|
557 | ||
|
550 | 558 | @spin_after |
|
551 | 559 | def map(self, f, *sequences, **kwargs): |
|
552 | 560 | """view.map(f, *sequences, block=self.block) => list|AsyncMapResult |
@@ -590,7 +598,9 b' class DirectView(View):' | |||
|
590 | 598 | pf = ParallelFunction(self, f, block=block, **kwargs) |
|
591 | 599 | return pf.map(*sequences) |
|
592 | 600 | |
|
593 | def execute(self, code, targets=None, block=None): | |
|
601 | @sync_results | |
|
602 | @save_ids | |
|
603 | def execute(self, code, silent=True, targets=None, block=None): | |
|
594 | 604 | """Executes `code` on `targets` in blocking or nonblocking manner. |
|
595 | 605 | |
|
596 | 606 | ``execute`` is always `bound` (affects engine namespace) |
@@ -604,7 +614,22 b' class DirectView(View):' | |||
|
604 | 614 | whether or not to wait until done to return |
|
605 | 615 | default: self.block |
|
606 | 616 | """ |
|
607 | return self._really_apply(util._execute, args=(code,), block=block, targets=targets) | |
|
617 | block = self.block if block is None else block | |
|
618 | targets = self.targets if targets is None else targets | |
|
619 | ||
|
620 | _idents = self.client._build_targets(targets)[0] | |
|
621 | msg_ids = [] | |
|
622 | trackers = [] | |
|
623 | for ident in _idents: | |
|
624 | msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident) | |
|
625 | msg_ids.append(msg['header']['msg_id']) | |
|
626 | ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets) | |
|
627 | if block: | |
|
628 | try: | |
|
629 | ar.get() | |
|
630 | except KeyboardInterrupt: | |
|
631 | pass | |
|
632 | return ar | |
|
608 | 633 | |
|
609 | 634 | def run(self, filename, targets=None, block=None): |
|
610 | 635 | """Execute contents of `filename` on my engine(s). |
@@ -996,7 +1021,7 b' class LoadBalancedView(View):' | |||
|
996 | 1021 | follow = self._render_dependency(follow) |
|
997 | 1022 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) |
|
998 | 1023 | |
|
999 |
msg = self.client.send_apply_ |
|
|
1024 | msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, | |
|
1000 | 1025 | subheader=subheader) |
|
1001 | 1026 | tracker = None if track is False else msg['tracker'] |
|
1002 | 1027 |
@@ -23,9 +23,10 b' from zmq.devices import ThreadDevice' | |||
|
23 | 23 | from zmq.eventloop import ioloop, zmqstream |
|
24 | 24 | |
|
25 | 25 | from IPython.config.configurable import LoggingConfigurable |
|
26 | from IPython.utils.py3compat import str_to_bytes | |
|
26 | 27 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer |
|
27 | 28 | |
|
28 |
from IPython.parallel.util import |
|
|
29 | from IPython.parallel.util import log_errors | |
|
29 | 30 | |
|
30 | 31 | class Heart(object): |
|
31 | 32 | """A basic heart object for responding to a HeartMonitor. |
@@ -123,7 +124,7 b' class HeartMonitor(LoggingConfigurable):' | |||
|
123 | 124 | self.responses = set() |
|
124 | 125 | # print self.on_probation, self.hearts |
|
125 | 126 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) |
|
126 |
self.pingstream.send( |
|
|
127 | self.pingstream.send(str_to_bytes(str(self.lifetime))) | |
|
127 | 128 | # flush stream to force immediate socket send |
|
128 | 129 | self.pingstream.flush() |
|
129 | 130 | |
@@ -151,8 +152,8 b' class HeartMonitor(LoggingConfigurable):' | |||
|
151 | 152 | @log_errors |
|
152 | 153 | def handle_pong(self, msg): |
|
153 | 154 | "a heart just beat" |
|
154 |
current = |
|
|
155 |
last = |
|
|
155 | current = str_to_bytes(str(self.lifetime)) | |
|
156 | last = str_to_bytes(str(self.last_ping)) | |
|
156 | 157 | if msg[1] == current: |
|
157 | 158 | delta = time.time()-self.tic |
|
158 | 159 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
@@ -28,6 +28,7 b' from zmq.eventloop.zmqstream import ZMQStream' | |||
|
28 | 28 | |
|
29 | 29 | # internal: |
|
30 | 30 | from IPython.utils.importstring import import_item |
|
31 | from IPython.utils.py3compat import cast_bytes | |
|
31 | 32 | from IPython.utils.traitlets import ( |
|
32 | 33 | HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName |
|
33 | 34 | ) |
@@ -441,7 +442,7 b' class Hub(SessionFactory):' | |||
|
441 | 442 | for t in targets: |
|
442 | 443 | # map raw identities to ids |
|
443 | 444 | if isinstance(t, (str,unicode)): |
|
444 | t = self.by_ident.get(t, t) | |
|
445 | t = self.by_ident.get(cast_bytes(t), t) | |
|
445 | 446 | _targets.append(t) |
|
446 | 447 | targets = _targets |
|
447 | 448 | bad_targets = [ t for t in targets if t not in self.ids ] |
@@ -467,13 +468,13 b' class Hub(SessionFactory):' | |||
|
467 | 468 | except ValueError: |
|
468 | 469 | idents=[] |
|
469 | 470 | if not idents: |
|
470 |
self.log.error(" |
|
|
471 | self.log.error("Monitor message without topic: %r", msg) | |
|
471 | 472 | return |
|
472 | 473 | handler = self.monitor_handlers.get(switch, None) |
|
473 | 474 | if handler is not None: |
|
474 | 475 | handler(idents, msg) |
|
475 | 476 | else: |
|
476 |
self.log.error(" |
|
|
477 | self.log.error("Unrecognized monitor topic: %r", switch) | |
|
477 | 478 | |
|
478 | 479 | |
|
479 | 480 | @util.log_errors |
@@ -719,14 +720,17 b' class Hub(SessionFactory):' | |||
|
719 | 720 | self.unassigned.remove(msg_id) |
|
720 | 721 | |
|
721 | 722 | header = msg['header'] |
|
722 |
engine_uuid = header.get('engine', |
|
|
723 | eid = self.by_ident.get(engine_uuid, None) | |
|
723 | engine_uuid = header.get('engine', u'') | |
|
724 | eid = self.by_ident.get(cast_bytes(engine_uuid), None) | |
|
725 | ||
|
726 | status = header.get('status', None) | |
|
724 | 727 | |
|
725 | 728 | if msg_id in self.pending: |
|
726 | 729 | self.log.info("task::task %r finished on %s", msg_id, eid) |
|
727 | 730 | self.pending.remove(msg_id) |
|
728 | 731 | self.all_completed.add(msg_id) |
|
729 | 732 | if eid is not None: |
|
733 | if status != 'aborted': | |
|
730 | 734 | self.completed[eid].append(msg_id) |
|
731 | 735 | if msg_id in self.tasks[eid]: |
|
732 | 736 | self.tasks[eid].remove(msg_id) |
@@ -760,7 +764,7 b' class Hub(SessionFactory):' | |||
|
760 | 764 | # print (content) |
|
761 | 765 | msg_id = content['msg_id'] |
|
762 | 766 | engine_uuid = content['engine_id'] |
|
763 |
eid = self.by_ident[ |
|
|
767 | eid = self.by_ident[cast_bytes(engine_uuid)] | |
|
764 | 768 | |
|
765 | 769 | self.log.info("task::task %r arrived on %r", msg_id, eid) |
|
766 | 770 | if msg_id in self.unassigned: |
@@ -796,7 +800,7 b' class Hub(SessionFactory):' | |||
|
796 | 800 | |
|
797 | 801 | parent = msg['parent_header'] |
|
798 | 802 | if not parent: |
|
799 |
self.log. |
|
|
803 | self.log.warn("iopub::IOPub message lacks parent: %r", msg) | |
|
800 | 804 | return |
|
801 | 805 | msg_id = parent['msg_id'] |
|
802 | 806 | msg_type = msg['header']['msg_type'] |
@@ -850,13 +854,13 b' class Hub(SessionFactory):' | |||
|
850 | 854 | """Register a new engine.""" |
|
851 | 855 | content = msg['content'] |
|
852 | 856 | try: |
|
853 |
queue = |
|
|
857 | queue = cast_bytes(content['queue']) | |
|
854 | 858 | except KeyError: |
|
855 | 859 | self.log.error("registration::queue not specified", exc_info=True) |
|
856 | 860 | return |
|
857 | 861 | heart = content.get('heartbeat', None) |
|
858 | 862 | if heart: |
|
859 |
heart = |
|
|
863 | heart = cast_bytes(heart) | |
|
860 | 864 | """register a new engine, and create the socket(s) necessary""" |
|
861 | 865 | eid = self._next_id |
|
862 | 866 | # print (eid, queue, reg, heart) |
@@ -1130,7 +1134,7 b' class Hub(SessionFactory):' | |||
|
1130 | 1134 | |
|
1131 | 1135 | # validate msg_ids |
|
1132 | 1136 | found_ids = [ rec['msg_id'] for rec in records ] |
|
1133 | invalid_ids = filter(lambda m: m in self.pending, found_ids) | |
|
1137 | pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ] | |
|
1134 | 1138 | if len(records) > len(msg_ids): |
|
1135 | 1139 | try: |
|
1136 | 1140 | raise RuntimeError("DB appears to be in an inconsistent state." |
@@ -1143,40 +1147,46 b' class Hub(SessionFactory):' | |||
|
1143 | 1147 | raise KeyError("No such msg(s): %r" % missing) |
|
1144 | 1148 | except KeyError: |
|
1145 | 1149 | return finish(error.wrap_exception()) |
|
1146 |
elif |
|
|
1147 | msg_id = invalid_ids[0] | |
|
1148 | try: | |
|
1149 | raise ValueError("Task %r appears to be inflight" % msg_id) | |
|
1150 | except Exception: | |
|
1151 | return finish(error.wrap_exception()) | |
|
1150 | elif pending_ids: | |
|
1151 | pass | |
|
1152 | # no need to raise on resubmit of pending task, now that we | |
|
1153 | # resubmit under new ID, but do we want to raise anyway? | |
|
1154 | # msg_id = invalid_ids[0] | |
|
1155 | # try: | |
|
1156 | # raise ValueError("Task(s) %r appears to be inflight" % ) | |
|
1157 | # except Exception: | |
|
1158 | # return finish(error.wrap_exception()) | |
|
1159 | ||
|
1160 | # mapping of original IDs to resubmitted IDs | |
|
1161 | resubmitted = {} | |
|
1152 | 1162 | |
|
1153 | # clear the existing records | |
|
1154 | now = datetime.now() | |
|
1155 | rec = empty_record() | |
|
1156 | map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted']) | |
|
1157 | rec['resubmitted'] = now | |
|
1158 | rec['queue'] = 'task' | |
|
1159 | rec['client_uuid'] = client_id[0] | |
|
1160 | try: | |
|
1161 | for msg_id in msg_ids: | |
|
1162 | self.all_completed.discard(msg_id) | |
|
1163 | self.db.update_record(msg_id, rec) | |
|
1164 | except Exception: | |
|
1165 | self.log.error('db::db error upating record', exc_info=True) | |
|
1166 | reply = error.wrap_exception() | |
|
1167 | else: | |
|
1168 | 1163 |
|
|
1169 | 1164 |
|
|
1170 | 1165 |
|
|
1171 | # include resubmitted in header to prevent digest collision | |
|
1172 | header['resubmitted'] = now | |
|
1173 | 1166 |
|
|
1167 | msg_id = msg['msg_id'] | |
|
1174 | 1168 |
|
|
1169 | header.update(msg['header']) | |
|
1175 | 1170 |
|
|
1176 | msg['header']['msg_id'] = rec['msg_id'] | |
|
1171 | ||
|
1177 | 1172 |
|
|
1178 | 1173 | |
|
1179 | finish(dict(status='ok')) | |
|
1174 | resubmitted[rec['msg_id']] = msg_id | |
|
1175 | self.pending.add(msg_id) | |
|
1176 | msg['buffers'] = [] | |
|
1177 | try: | |
|
1178 | self.db.add_record(msg_id, init_record(msg)) | |
|
1179 | except Exception: | |
|
1180 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) | |
|
1181 | ||
|
1182 | finish(dict(status='ok', resubmitted=resubmitted)) | |
|
1183 | ||
|
1184 | # store the new IDs in the Task DB | |
|
1185 | for msg_id, resubmit_id in resubmitted.iteritems(): | |
|
1186 | try: | |
|
1187 | self.db.update_record(msg_id, {'resubmitted' : resubmit_id}) | |
|
1188 | except Exception: | |
|
1189 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) | |
|
1180 | 1190 | |
|
1181 | 1191 | |
|
1182 | 1192 | def _extract_record(self, rec): |
@@ -42,10 +42,11 b' from IPython.external.decorator import decorator' | |||
|
42 | 42 | from IPython.config.application import Application |
|
43 | 43 | from IPython.config.loader import Config |
|
44 | 44 | from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes |
|
45 | from IPython.utils.py3compat import cast_bytes | |
|
45 | 46 | |
|
46 | 47 | from IPython.parallel import error, util |
|
47 | 48 | from IPython.parallel.factory import SessionFactory |
|
48 |
from IPython.parallel.util import connect_logger, local_logger |
|
|
49 | from IPython.parallel.util import connect_logger, local_logger | |
|
49 | 50 | |
|
50 | 51 | from .dependency import Dependency |
|
51 | 52 | |
@@ -262,7 +263,7 b' class TaskScheduler(SessionFactory):' | |||
|
262 | 263 | self.log.error("Unhandled message type: %r"%msg_type) |
|
263 | 264 | else: |
|
264 | 265 | try: |
|
265 | handler(asbytes(msg['content']['queue'])) | |
|
266 | handler(cast_bytes(msg['content']['queue'])) | |
|
266 | 267 | except Exception: |
|
267 | 268 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) |
|
268 | 269 | |
@@ -316,7 +317,7 b' class TaskScheduler(SessionFactory):' | |||
|
316 | 317 | # prevent double-handling of messages |
|
317 | 318 | continue |
|
318 | 319 | |
|
319 |
raw_msg = lost[msg_id] |
|
|
320 | raw_msg = lost[msg_id].raw_msg | |
|
320 | 321 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
321 | 322 | parent = self.session.unpack(msg[1].bytes) |
|
322 | 323 | idents = [engine, idents[0]] |
@@ -370,7 +371,7 b' class TaskScheduler(SessionFactory):' | |||
|
370 | 371 | # get targets as a set of bytes objects |
|
371 | 372 | # from a list of unicode objects |
|
372 | 373 | targets = header.get('targets', []) |
|
373 | targets = map(asbytes, targets) | |
|
374 | targets = map(cast_bytes, targets) | |
|
374 | 375 | targets = set(targets) |
|
375 | 376 | |
|
376 | 377 | retries = header.get('retries', 0) |
@@ -138,7 +138,7 b' class SQLiteDB(BaseDB):' | |||
|
138 | 138 | 'engine_uuid' : 'text', |
|
139 | 139 | 'started' : 'timestamp', |
|
140 | 140 | 'completed' : 'timestamp', |
|
141 |
'resubmitted' : 't |
|
|
141 | 'resubmitted' : 'text', | |
|
142 | 142 | 'received' : 'timestamp', |
|
143 | 143 | 'result_header' : 'dict text', |
|
144 | 144 | 'result_content' : 'dict text', |
@@ -226,15 +226,16 b' class SQLiteDB(BaseDB):' | |||
|
226 | 226 | # isolation_level = None)#, |
|
227 | 227 | cached_statements=64) |
|
228 | 228 | # print dir(self._db) |
|
229 | first_table = self.table | |
|
229 | first_table = previous_table = self.table | |
|
230 | 230 | i=0 |
|
231 | 231 | while not self._check_table(): |
|
232 | 232 | i+=1 |
|
233 | 233 | self.table = first_table+'_%i'%i |
|
234 | 234 | self.log.warn( |
|
235 | 235 | "Table %s exists and doesn't match db format, trying %s"% |
|
236 |
( |
|
|
236 | (previous_table, self.table) | |
|
237 | 237 | ) |
|
238 | previous_table = self.table | |
|
238 | 239 | |
|
239 | 240 | self._db.execute("""CREATE TABLE IF NOT EXISTS %s |
|
240 | 241 | (msg_id text PRIMARY KEY, |
@@ -246,7 +247,7 b' class SQLiteDB(BaseDB):' | |||
|
246 | 247 | engine_uuid text, |
|
247 | 248 | started timestamp, |
|
248 | 249 | completed timestamp, |
|
249 |
resubmitted t |
|
|
250 | resubmitted text, | |
|
250 | 251 | received timestamp, |
|
251 | 252 | result_header dict text, |
|
252 | 253 | result_content dict text, |
@@ -27,15 +27,14 b' from IPython.external.ssh import tunnel' | |||
|
27 | 27 | from IPython.utils.traitlets import ( |
|
28 | 28 | Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool |
|
29 | 29 | ) |
|
30 |
from IPython.utils import |
|
|
30 | from IPython.utils.py3compat import cast_bytes | |
|
31 | 31 | |
|
32 | 32 | from IPython.parallel.controller.heartmonitor import Heart |
|
33 | 33 | from IPython.parallel.factory import RegistrationFactory |
|
34 |
from IPython.parallel.util import disambiguate_url |
|
|
34 | from IPython.parallel.util import disambiguate_url | |
|
35 | 35 | |
|
36 | 36 | from IPython.zmq.session import Message |
|
37 | ||
|
38 | from .streamkernel import Kernel | |
|
37 | from IPython.zmq.ipkernel import Kernel | |
|
39 | 38 | |
|
40 | 39 | class EngineFactory(RegistrationFactory): |
|
41 | 40 | """IPython engine""" |
@@ -70,7 +69,7 b' class EngineFactory(RegistrationFactory):' | |||
|
70 | 69 | bident = CBytes() |
|
71 | 70 | ident = Unicode() |
|
72 | 71 | def _ident_changed(self, name, old, new): |
|
73 | self.bident = asbytes(new) | |
|
72 | self.bident = cast_bytes(new) | |
|
74 | 73 | using_ssh=Bool(False) |
|
75 | 74 | |
|
76 | 75 | |
@@ -185,23 +184,27 b' class EngineFactory(RegistrationFactory):' | |||
|
185 | 184 | |
|
186 | 185 | # create iopub stream: |
|
187 | 186 | iopub_addr = msg.content.iopub |
|
188 |
iopub_s |
|
|
189 |
iopub_s |
|
|
190 |
connect(iopub_s |
|
|
187 | iopub_socket = ctx.socket(zmq.PUB) | |
|
188 | iopub_socket.setsockopt(zmq.IDENTITY, identity) | |
|
189 | connect(iopub_socket, iopub_addr) | |
|
190 | ||
|
191 | # disable history: | |
|
192 | self.config.HistoryManager.hist_file = ':memory:' | |
|
191 | 193 | |
|
192 |
|
|
|
194 | # Redirect input streams and set a display hook. | |
|
193 | 195 | if self.out_stream_factory: |
|
194 |
sys.stdout = self.out_stream_factory(self.session, iopub_s |
|
|
195 |
sys.stdout.topic = |
|
|
196 |
sys.stderr = self.out_stream_factory(self.session, iopub_s |
|
|
197 |
sys.stderr.topic = |
|
|
196 | sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout') | |
|
197 | sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id) | |
|
198 | sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr') | |
|
199 | sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id) | |
|
198 | 200 | if self.display_hook_factory: |
|
199 |
sys.displayhook = self.display_hook_factory(self.session, iopub_s |
|
|
200 |
sys.displayhook.topic = |
|
|
201 | sys.displayhook = self.display_hook_factory(self.session, iopub_socket) | |
|
202 | sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id) | |
|
201 | 203 | |
|
202 | 204 | self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, |
|
203 |
control_stream=control_stream, shell_streams=shell_streams, iopub_s |
|
|
205 | control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket, | |
|
204 | 206 |
loop=loop, user_ns |
|
207 | self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id) | |
|
205 | 208 | self.kernel.start() |
|
206 | 209 | |
|
207 | 210 |
@@ -65,7 +65,7 b' def setup():' | |||
|
65 | 65 | if cp.poll() is not None: |
|
66 | 66 | print cp.poll() |
|
67 | 67 | raise RuntimeError("The test controller failed to start.") |
|
68 |
elif time.time()-tic > 1 |
|
|
68 | elif time.time()-tic > 15: | |
|
69 | 69 | raise RuntimeError("Timeout waiting for the test controller to start.") |
|
70 | 70 | time.sleep(0.1) |
|
71 | 71 | add_engines(1) |
@@ -93,7 +93,7 b" def add_engines(n=1, profile='iptest', total=False):" | |||
|
93 | 93 | while len(rc) < base+n: |
|
94 | 94 | if any([ ep.poll() is not None for ep in eps ]): |
|
95 | 95 | raise RuntimeError("A test engine failed to start.") |
|
96 |
elif time.time()-tic > 1 |
|
|
96 | elif time.time()-tic > 15: | |
|
97 | 97 | raise RuntimeError("Timeout waiting for engines to connect.") |
|
98 | 98 | time.sleep(.1) |
|
99 | 99 | rc.spin() |
@@ -154,7 +154,7 b' class AsyncResultTest(ClusterTestCase):' | |||
|
154 | 154 | ar = v.apply_async(time.sleep, 0.25) |
|
155 | 155 | self.assertRaises(TimeoutError, getattr, ar, 'serial_time') |
|
156 | 156 | ar.get(2) |
|
157 |
self.assertTrue(ar.serial_time < |
|
|
157 | self.assertTrue(ar.serial_time < 1.) | |
|
158 | 158 | self.assertTrue(ar.serial_time > 0.2) |
|
159 | 159 | |
|
160 | 160 | def test_serial_time_multi(self): |
@@ -171,8 +171,8 b' class AsyncResultTest(ClusterTestCase):' | |||
|
171 | 171 | ar = v.apply_async(time.sleep, 0.25) |
|
172 | 172 | while not ar.ready(): |
|
173 | 173 | time.sleep(0.01) |
|
174 |
self.assertTrue(ar.elapsed < |
|
|
175 |
self.assertTrue(ar.elapsed < |
|
|
174 | self.assertTrue(ar.elapsed < 1) | |
|
175 | self.assertTrue(ar.elapsed < 1) | |
|
176 | 176 | ar.get(2) |
|
177 | 177 | |
|
178 | 178 | def test_elapsed_multi(self): |
@@ -180,8 +180,8 b' class AsyncResultTest(ClusterTestCase):' | |||
|
180 | 180 | ar = v.apply_async(time.sleep, 0.25) |
|
181 | 181 | while not ar.ready(): |
|
182 | 182 | time.sleep(0.01) |
|
183 |
self.assertTrue(ar.elapsed < |
|
|
184 |
self.assertTrue(ar.elapsed < |
|
|
183 | self.assertTrue(ar.elapsed < 1) | |
|
184 | self.assertTrue(ar.elapsed < 1) | |
|
185 | 185 | ar.get(2) |
|
186 | 186 | |
|
187 | 187 | def test_hubresult_timestamps(self): |
@@ -280,6 +280,25 b' class TestClient(ClusterTestCase):' | |||
|
280 | 280 | time.sleep(0.25) |
|
281 | 281 | self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) |
|
282 | 282 | |
|
283 | def _wait_for_idle(self): | |
|
284 | """wait for an engine to become idle, according to the Hub""" | |
|
285 | rc = self.client | |
|
286 | ||
|
287 | # timeout 2s, polling every 100ms | |
|
288 | for i in range(20): | |
|
289 | qs = rc.queue_status() | |
|
290 | if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids): | |
|
291 | time.sleep(0.1) | |
|
292 | else: | |
|
293 | break | |
|
294 | ||
|
295 | # ensure Hub up to date: | |
|
296 | qs = rc.queue_status() | |
|
297 | self.assertEquals(qs['unassigned'], 0) | |
|
298 | for eid in rc.ids: | |
|
299 | self.assertEquals(qs[eid]['tasks'], 0) | |
|
300 | ||
|
301 | ||
|
283 | 302 | def test_resubmit(self): |
|
284 | 303 | def f(): |
|
285 | 304 | import random |
@@ -288,19 +307,38 b' class TestClient(ClusterTestCase):' | |||
|
288 | 307 | ar = v.apply_async(f) |
|
289 | 308 | r1 = ar.get(1) |
|
290 | 309 | # give the Hub a chance to notice: |
|
291 | time.sleep(0.5) | |
|
310 | self._wait_for_idle() | |
|
292 | 311 | ahr = self.client.resubmit(ar.msg_ids) |
|
293 | 312 | r2 = ahr.get(1) |
|
294 | 313 | self.assertFalse(r1 == r2) |
|
295 | 314 | |
|
315 | def test_resubmit_aborted(self): | |
|
316 | def f(): | |
|
317 | import random | |
|
318 | return random.random() | |
|
319 | v = self.client.load_balanced_view() | |
|
320 | # restrict to one engine, so we can put a sleep | |
|
321 | # ahead of the task, so it will get aborted | |
|
322 | eid = self.client.ids[-1] | |
|
323 | v.targets = [eid] | |
|
324 | sleep = v.apply_async(time.sleep, 0.5) | |
|
325 | ar = v.apply_async(f) | |
|
326 | ar.abort() | |
|
327 | self.assertRaises(error.TaskAborted, ar.get) | |
|
328 | # Give the Hub a chance to get up to date: | |
|
329 | self._wait_for_idle() | |
|
330 | ahr = self.client.resubmit(ar.msg_ids) | |
|
331 | r2 = ahr.get(1) | |
|
332 | ||
|
296 | 333 | def test_resubmit_inflight(self): |
|
297 |
""" |
|
|
334 | """resubmit of inflight task""" | |
|
298 | 335 | v = self.client.load_balanced_view() |
|
299 | 336 | ar = v.apply_async(time.sleep,1) |
|
300 | 337 | # give the message a chance to arrive |
|
301 | 338 | time.sleep(0.2) |
|
302 |
|
|
|
339 | ahr = self.client.resubmit(ar.msg_ids) | |
|
303 | 340 | ar.get(2) |
|
341 | ahr.get(2) | |
|
304 | 342 | |
|
305 | 343 | def test_resubmit_badkey(self): |
|
306 | 344 | """ensure KeyError on resubmit of nonexistant task""" |
@@ -18,6 +18,7 b' Authors:' | |||
|
18 | 18 | |
|
19 | 19 | from __future__ import division |
|
20 | 20 | |
|
21 | import logging | |
|
21 | 22 | import os |
|
22 | 23 | import tempfile |
|
23 | 24 | import time |
@@ -226,7 +227,9 b' class TestSQLiteBackend(TestDictBackend):' | |||
|
226 | 227 | @dec.skip_without('sqlite3') |
|
227 | 228 | def create_db(self): |
|
228 | 229 | location, fname = os.path.split(temp_db) |
|
229 | return SQLiteDB(location=location, fname=fname) | |
|
230 | log = logging.getLogger('test') | |
|
231 | log.setLevel(logging.CRITICAL) | |
|
232 | return SQLiteDB(location=location, fname=fname, log=log) | |
|
230 | 233 | |
|
231 | 234 | def tearDown(self): |
|
232 | 235 | self.db._db.close() |
@@ -25,6 +25,7 b' import zmq' | |||
|
25 | 25 | from nose import SkipTest |
|
26 | 26 | |
|
27 | 27 | from IPython.testing import decorators as dec |
|
28 | from IPython.testing.ipunittest import ParametricTestCase | |
|
28 | 29 | |
|
29 | 30 | from IPython import parallel as pmod |
|
30 | 31 | from IPython.parallel import error |
@@ -39,7 +40,7 b' from .clienttest import ClusterTestCase, crash, wait, skip_without' | |||
|
39 | 40 | def setup(): |
|
40 | 41 | add_engines(3, total=True) |
|
41 | 42 | |
|
42 | class TestView(ClusterTestCase): | |
|
43 | class TestView(ClusterTestCase, ParametricTestCase): | |
|
43 | 44 | |
|
44 | 45 | def test_z_crash_mux(self): |
|
45 | 46 | """test graceful handling of engine death (direct)""" |
@@ -421,16 +422,16 b' class TestView(ClusterTestCase):' | |||
|
421 | 422 | sys.stdout = sio |
|
422 | 423 | ip.magic_autopx() |
|
423 | 424 | ip.run_cell('\n'.join(('a=5','b=10','c=0'))) |
|
424 |
ip.run_cell(' |
|
|
425 | ip.run_cell('b*=2') | |
|
426 | ip.run_cell('print (b)') | |
|
425 | 427 | ip.run_cell("b/c") |
|
426 | ip.run_code(compile('b*=2', '', 'single')) | |
|
427 | 428 | ip.magic_autopx() |
|
428 | 429 | sys.stdout = savestdout |
|
429 | 430 | output = sio.getvalue().strip() |
|
430 | 431 | self.assertTrue(output.startswith('%autopx enabled')) |
|
431 | 432 | self.assertTrue(output.endswith('%autopx disabled')) |
|
432 | 433 | self.assertTrue('RemoteError: ZeroDivisionError' in output) |
|
433 |
ar = v.get_result(- |
|
|
434 | ar = v.get_result(-1) | |
|
434 | 435 | self.assertEquals(v['a'], 5) |
|
435 | 436 | self.assertEquals(v['b'], 20) |
|
436 | 437 | self.assertRaisesRemote(ZeroDivisionError, ar.get) |
@@ -446,9 +447,10 b' class TestView(ClusterTestCase):' | |||
|
446 | 447 | sys.stdout = sio |
|
447 | 448 | ip.magic_autopx() |
|
448 | 449 | ip.run_cell('\n'.join(('a=5','b=10','c=0'))) |
|
449 | ip.run_cell('print b') | |
|
450 | ip.run_cell('print (b)') | |
|
451 | ip.run_cell('import time; time.sleep(0.1)') | |
|
450 | 452 | ip.run_cell("b/c") |
|
451 |
ip.run_c |
|
|
453 | ip.run_cell('b*=2') | |
|
452 | 454 | ip.magic_autopx() |
|
453 | 455 | sys.stdout = savestdout |
|
454 | 456 | output = sio.getvalue().strip() |
@@ -456,9 +458,12 b' class TestView(ClusterTestCase):' | |||
|
456 | 458 | self.assertTrue(output.endswith('%autopx disabled')) |
|
457 | 459 | self.assertFalse('ZeroDivisionError' in output) |
|
458 | 460 | ar = v.get_result(-2) |
|
459 | self.assertEquals(v['a'], 5) | |
|
460 | self.assertEquals(v['b'], 20) | |
|
461 | 461 | self.assertRaisesRemote(ZeroDivisionError, ar.get) |
|
462 | # prevent TaskAborted on pulls, due to ZeroDivisionError | |
|
463 | time.sleep(0.5) | |
|
464 | self.assertEquals(v['a'], 5) | |
|
465 | # b*=2 will not fire, due to abort | |
|
466 | self.assertEquals(v['b'], 10) | |
|
462 | 467 | |
|
463 | 468 | def test_magic_result(self): |
|
464 | 469 | ip = get_ipython() |
@@ -551,3 +556,139 b' class TestView(ClusterTestCase):' | |||
|
551 | 556 | result = e0.map_sync(lambda x: -1*x, r) |
|
552 | 557 | self.assertEquals(result, check) |
|
553 | 558 | |
|
559 | def test_len(self): | |
|
560 | """len(view) makes sense""" | |
|
561 | e0 = self.client[self.client.ids[0]] | |
|
562 | yield self.assertEquals(len(e0), 1) | |
|
563 | v = self.client[:] | |
|
564 | yield self.assertEquals(len(v), len(self.client.ids)) | |
|
565 | v = self.client.direct_view('all') | |
|
566 | yield self.assertEquals(len(v), len(self.client.ids)) | |
|
567 | v = self.client[:2] | |
|
568 | yield self.assertEquals(len(v), 2) | |
|
569 | v = self.client[:1] | |
|
570 | yield self.assertEquals(len(v), 1) | |
|
571 | v = self.client.load_balanced_view() | |
|
572 | yield self.assertEquals(len(v), len(self.client.ids)) | |
|
573 | # parametric tests seem to require manual closing? | |
|
574 | self.client.close() | |
|
575 | ||
|
576 | ||
|
577 | # begin execute tests | |
|
578 | def _wait_for(self, f, timeout=10): | |
|
579 | tic = time.time() | |
|
580 | while time.time() <= tic + timeout: | |
|
581 | if f(): | |
|
582 | return | |
|
583 | time.sleep(0.1) | |
|
584 | self.client.spin() | |
|
585 | if not f(): | |
|
586 | print "Warning: Awaited condition never arrived" | |
|
587 | ||
|
588 | ||
|
589 | def test_execute_reply(self): | |
|
590 | e0 = self.client[self.client.ids[0]] | |
|
591 | e0.block = True | |
|
592 | ar = e0.execute("5", silent=False) | |
|
593 | er = ar.get() | |
|
594 | self._wait_for(lambda : bool(er.pyout)) | |
|
595 | self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count) | |
|
596 | self.assertEquals(er.pyout['data']['text/plain'], '5') | |
|
597 | ||
|
598 | def test_execute_reply_stdout(self): | |
|
599 | e0 = self.client[self.client.ids[0]] | |
|
600 | e0.block = True | |
|
601 | ar = e0.execute("print (5)", silent=False) | |
|
602 | er = ar.get() | |
|
603 | self._wait_for(lambda : bool(er.stdout)) | |
|
604 | self.assertEquals(er.stdout.strip(), '5') | |
|
605 | ||
|
606 | def test_execute_pyout(self): | |
|
607 | """execute triggers pyout with silent=False""" | |
|
608 | view = self.client[:] | |
|
609 | ar = view.execute("5", silent=False, block=True) | |
|
610 | self._wait_for(lambda : all(ar.pyout)) | |
|
611 | ||
|
612 | expected = [{'text/plain' : '5'}] * len(view) | |
|
613 | mimes = [ out['data'] for out in ar.pyout ] | |
|
614 | self.assertEquals(mimes, expected) | |
|
615 | ||
|
616 | def test_execute_silent(self): | |
|
617 | """execute does not trigger pyout with silent=True""" | |
|
618 | view = self.client[:] | |
|
619 | ar = view.execute("5", block=True) | |
|
620 | expected = [None] * len(view) | |
|
621 | self.assertEquals(ar.pyout, expected) | |
|
622 | ||
|
623 | def test_execute_magic(self): | |
|
624 | """execute accepts IPython commands""" | |
|
625 | view = self.client[:] | |
|
626 | view.execute("a = 5") | |
|
627 | ar = view.execute("%whos", block=True) | |
|
628 | # this will raise, if that failed | |
|
629 | ar.get(5) | |
|
630 | self._wait_for(lambda : all(ar.stdout)) | |
|
631 | for stdout in ar.stdout: | |
|
632 | lines = stdout.splitlines() | |
|
633 | self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info']) | |
|
634 | found = False | |
|
635 | for line in lines[2:]: | |
|
636 | split = line.split() | |
|
637 | if split == ['a', 'int', '5']: | |
|
638 | found = True | |
|
639 | break | |
|
640 | self.assertTrue(found, "whos output wrong: %s" % stdout) | |
|
641 | ||
|
642 | def test_execute_displaypub(self): | |
|
643 | """execute tracks display_pub output""" | |
|
644 | view = self.client[:] | |
|
645 | view.execute("from IPython.core.display import *") | |
|
646 | ar = view.execute("[ display(i) for i in range(5) ]", block=True) | |
|
647 | ||
|
648 | self._wait_for(lambda : all(len(er.outputs) >= 5 for er in ar)) | |
|
649 | expected = [ {u'text/plain' : unicode(j)} for j in range(5) ] | |
|
650 | for outputs in ar.outputs: | |
|
651 | mimes = [ out['data'] for out in outputs ] | |
|
652 | self.assertEquals(mimes, expected) | |
|
653 | ||
|
654 | def test_apply_displaypub(self): | |
|
655 | """apply tracks display_pub output""" | |
|
656 | view = self.client[:] | |
|
657 | view.execute("from IPython.core.display import *") | |
|
658 | ||
|
659 | @interactive | |
|
660 | def publish(): | |
|
661 | [ display(i) for i in range(5) ] | |
|
662 | ||
|
663 | ar = view.apply_async(publish) | |
|
664 | ar.get(5) | |
|
665 | self._wait_for(lambda : all(len(out) >= 5 for out in ar.outputs)) | |
|
666 | expected = [ {u'text/plain' : unicode(j)} for j in range(5) ] | |
|
667 | for outputs in ar.outputs: | |
|
668 | mimes = [ out['data'] for out in outputs ] | |
|
669 | self.assertEquals(mimes, expected) | |
|
670 | ||
|
671 | def test_execute_raises(self): | |
|
672 | """exceptions in execute requests raise appropriately""" | |
|
673 | view = self.client[-1] | |
|
674 | ar = view.execute("1/0") | |
|
675 | self.assertRaisesRemote(ZeroDivisionError, ar.get, 2) | |
|
676 | ||
|
677 | @dec.skipif_not_matplotlib | |
|
678 | def test_magic_pylab(self): | |
|
679 | """%pylab works on engines""" | |
|
680 | view = self.client[-1] | |
|
681 | ar = view.execute("%pylab inline") | |
|
682 | # at least check if this raised: | |
|
683 | reply = ar.get(5) | |
|
684 | # include imports, in case user config | |
|
685 | ar = view.execute("plot(rand(100))", silent=False) | |
|
686 | reply = ar.get(5) | |
|
687 | self._wait_for(lambda : all(ar.outputs)) | |
|
688 | self.assertEquals(len(reply.outputs), 1) | |
|
689 | output = reply.outputs[0] | |
|
690 | self.assertTrue("data" in output) | |
|
691 | data = output['data'] | |
|
692 | self.assertTrue("image/png" in data) | |
|
693 | ||
|
694 |
@@ -47,6 +47,9 b' from IPython.utils import py3compat' | |||
|
47 | 47 | from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence |
|
48 | 48 | from IPython.utils.newserialized import serialize, unserialize |
|
49 | 49 | from IPython.zmq.log import EnginePUBHandler |
|
50 | from IPython.zmq.serialize import ( | |
|
51 | unserialize_object, serialize_object, pack_apply_message, unpack_apply_message | |
|
52 | ) | |
|
50 | 53 | |
|
51 | 54 | if py3compat.PY3: |
|
52 | 55 | buffer = memoryview |
@@ -121,12 +124,6 b' def log_errors(f, self, *args, **kwargs):' | |||
|
121 | 124 | self.log.error("Uncaught exception in %r" % f, exc_info=True) |
|
122 | 125 | |
|
123 | 126 | |
|
124 | def asbytes(s): | |
|
125 | """ensure that an object is ascii bytes""" | |
|
126 | if isinstance(s, unicode): | |
|
127 | s = s.encode('ascii') | |
|
128 | return s | |
|
129 | ||
|
130 | 127 | def is_url(url): |
|
131 | 128 | """boolean check for whether a string is a zmq url""" |
|
132 | 129 | if '://' not in url: |
@@ -222,140 +219,6 b' def disambiguate_url(url, location=None):' | |||
|
222 | 219 | |
|
223 | 220 | return "%s://%s:%s"%(proto,ip,port) |
|
224 | 221 | |
|
225 | def serialize_object(obj, threshold=64e-6): | |
|
226 | """Serialize an object into a list of sendable buffers. | |
|
227 | ||
|
228 | Parameters | |
|
229 | ---------- | |
|
230 | ||
|
231 | obj : object | |
|
232 | The object to be serialized | |
|
233 | threshold : float | |
|
234 | The threshold for not double-pickling the content. | |
|
235 | ||
|
236 | ||
|
237 | Returns | |
|
238 | ------- | |
|
239 | ('pmd', [bufs]) : | |
|
240 | where pmd is the pickled metadata wrapper, | |
|
241 | bufs is a list of data buffers | |
|
242 | """ | |
|
243 | databuffers = [] | |
|
244 | if isinstance(obj, (list, tuple)): | |
|
245 | clist = canSequence(obj) | |
|
246 | slist = map(serialize, clist) | |
|
247 | for s in slist: | |
|
248 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |
|
249 | databuffers.append(s.getData()) | |
|
250 | s.data = None | |
|
251 | return pickle.dumps(slist,-1), databuffers | |
|
252 | elif isinstance(obj, dict): | |
|
253 | sobj = {} | |
|
254 | for k in sorted(obj.iterkeys()): | |
|
255 | s = serialize(can(obj[k])) | |
|
256 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |
|
257 | databuffers.append(s.getData()) | |
|
258 | s.data = None | |
|
259 | sobj[k] = s | |
|
260 | return pickle.dumps(sobj,-1),databuffers | |
|
261 | else: | |
|
262 | s = serialize(can(obj)) | |
|
263 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |
|
264 | databuffers.append(s.getData()) | |
|
265 | s.data = None | |
|
266 | return pickle.dumps(s,-1),databuffers | |
|
267 | ||
|
268 | ||
|
269 | def unserialize_object(bufs): | |
|
270 | """reconstruct an object serialized by serialize_object from data buffers.""" | |
|
271 | bufs = list(bufs) | |
|
272 | sobj = pickle.loads(bufs.pop(0)) | |
|
273 | if isinstance(sobj, (list, tuple)): | |
|
274 | for s in sobj: | |
|
275 | if s.data is None: | |
|
276 | s.data = bufs.pop(0) | |
|
277 | return uncanSequence(map(unserialize, sobj)), bufs | |
|
278 | elif isinstance(sobj, dict): | |
|
279 | newobj = {} | |
|
280 | for k in sorted(sobj.iterkeys()): | |
|
281 | s = sobj[k] | |
|
282 | if s.data is None: | |
|
283 | s.data = bufs.pop(0) | |
|
284 | newobj[k] = uncan(unserialize(s)) | |
|
285 | return newobj, bufs | |
|
286 | else: | |
|
287 | if sobj.data is None: | |
|
288 | sobj.data = bufs.pop(0) | |
|
289 | return uncan(unserialize(sobj)), bufs | |
|
290 | ||
|
291 | def pack_apply_message(f, args, kwargs, threshold=64e-6): | |
|
292 | """pack up a function, args, and kwargs to be sent over the wire | |
|
293 | as a series of buffers. Any object whose data is larger than `threshold` | |
|
294 | will not have their data copied (currently only numpy arrays support zero-copy)""" | |
|
295 | msg = [pickle.dumps(can(f),-1)] | |
|
296 | databuffers = [] # for large objects | |
|
297 | sargs, bufs = serialize_object(args,threshold) | |
|
298 | msg.append(sargs) | |
|
299 | databuffers.extend(bufs) | |
|
300 | skwargs, bufs = serialize_object(kwargs,threshold) | |
|
301 | msg.append(skwargs) | |
|
302 | databuffers.extend(bufs) | |
|
303 | msg.extend(databuffers) | |
|
304 | return msg | |
|
305 | ||
|
306 | def unpack_apply_message(bufs, g=None, copy=True): | |
|
307 | """unpack f,args,kwargs from buffers packed by pack_apply_message() | |
|
308 | Returns: original f,args,kwargs""" | |
|
309 | bufs = list(bufs) # allow us to pop | |
|
310 | assert len(bufs) >= 3, "not enough buffers!" | |
|
311 | if not copy: | |
|
312 | for i in range(3): | |
|
313 | bufs[i] = bufs[i].bytes | |
|
314 | cf = pickle.loads(bufs.pop(0)) | |
|
315 | sargs = list(pickle.loads(bufs.pop(0))) | |
|
316 | skwargs = dict(pickle.loads(bufs.pop(0))) | |
|
317 | # print sargs, skwargs | |
|
318 | f = uncan(cf, g) | |
|
319 | for sa in sargs: | |
|
320 | if sa.data is None: | |
|
321 | m = bufs.pop(0) | |
|
322 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |
|
323 | # always use a buffer, until memoryviews get sorted out | |
|
324 | sa.data = buffer(m) | |
|
325 | # disable memoryview support | |
|
326 | # if copy: | |
|
327 | # sa.data = buffer(m) | |
|
328 | # else: | |
|
329 | # sa.data = m.buffer | |
|
330 | else: | |
|
331 | if copy: | |
|
332 | sa.data = m | |
|
333 | else: | |
|
334 | sa.data = m.bytes | |
|
335 | ||
|
336 | args = uncanSequence(map(unserialize, sargs), g) | |
|
337 | kwargs = {} | |
|
338 | for k in sorted(skwargs.iterkeys()): | |
|
339 | sa = skwargs[k] | |
|
340 | if sa.data is None: | |
|
341 | m = bufs.pop(0) | |
|
342 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |
|
343 | # always use a buffer, until memoryviews get sorted out | |
|
344 | sa.data = buffer(m) | |
|
345 | # disable memoryview support | |
|
346 | # if copy: | |
|
347 | # sa.data = buffer(m) | |
|
348 | # else: | |
|
349 | # sa.data = m.buffer | |
|
350 | else: | |
|
351 | if copy: | |
|
352 | sa.data = m | |
|
353 | else: | |
|
354 | sa.data = m.bytes | |
|
355 | ||
|
356 | kwargs[k] = uncan(unserialize(sa), g) | |
|
357 | ||
|
358 | return f,args,kwargs | |
|
359 | 222 | |
|
360 | 223 | #-------------------------------------------------------------------------- |
|
361 | 224 | # helpers for implementing old MEC API via view.apply |
@@ -43,6 +43,7 b' class ZMQShellDisplayHook(DisplayHook):' | |||
|
43 | 43 | """A displayhook subclass that publishes data using ZeroMQ. This is intended |
|
44 | 44 | to work with an InteractiveShell instance. It sends a dict of different |
|
45 | 45 | representations of the object.""" |
|
46 | topic=None | |
|
46 | 47 | |
|
47 | 48 | session = Instance(Session) |
|
48 | 49 | pub_socket = Instance('zmq.Socket') |
@@ -67,6 +68,6 b' class ZMQShellDisplayHook(DisplayHook):' | |||
|
67 | 68 | """Finish up all displayhook activities.""" |
|
68 | 69 | sys.stdout.flush() |
|
69 | 70 | sys.stderr.flush() |
|
70 | self.session.send(self.pub_socket, self.msg) | |
|
71 | self.session.send(self.pub_socket, self.msg, ident=self.topic) | |
|
71 | 72 | self.msg = None |
|
72 | 73 |
@@ -20,8 +20,10 b' import sys' | |||
|
20 | 20 | import zmq |
|
21 | 21 | |
|
22 | 22 | # Local imports. |
|
23 | from IPython.config.application import Application | |
|
23 | 24 | from IPython.utils import io |
|
24 | 25 | |
|
26 | ||
|
25 | 27 | #------------------------------------------------------------------------------ |
|
26 | 28 | # Eventloops for integrating the Kernel into different GUIs |
|
27 | 29 | #------------------------------------------------------------------------------ |
@@ -164,7 +166,10 b' def loop_cocoa(kernel):' | |||
|
164 | 166 | # but still need a Poller for when there are no active windows, |
|
165 | 167 | # during which time mainloop() returns immediately |
|
166 | 168 | poller = zmq.Poller() |
|
167 | poller.register(kernel.shell_socket, zmq.POLLIN) | |
|
169 | if kernel.control_stream: | |
|
170 | poller.register(kernel.control_stream.socket, zmq.POLLIN) | |
|
171 | for stream in kernel.shell_streams: | |
|
172 | poller.register(stream.socket, zmq.POLLIN) | |
|
168 | 173 | |
|
169 | 174 | while True: |
|
170 | 175 | try: |
@@ -203,11 +208,15 b' loop_map = {' | |||
|
203 | 208 | |
|
204 | 209 | def enable_gui(gui, kernel=None): |
|
205 | 210 | """Enable integration with a given GUI""" |
|
206 | if kernel is None: | |
|
207 | from .ipkernel import IPKernelApp | |
|
208 | kernel = IPKernelApp.instance().kernel | |
|
209 | 211 | if gui not in loop_map: |
|
210 | 212 | raise ValueError("GUI %r not supported" % gui) |
|
213 | if kernel is None: | |
|
214 | if Application.initialized(): | |
|
215 | kernel = getattr(Application.instance(), 'kernel', None) | |
|
216 | if kernel is None: | |
|
217 | raise RuntimeError("You didn't specify a kernel," | |
|
218 | " and no IPython Application with a kernel appears to be running." | |
|
219 | ) | |
|
211 | 220 | loop = loop_map[gui] |
|
212 | 221 | if kernel.eventloop is not None and kernel.eventloop is not loop: |
|
213 | 222 | raise RuntimeError("Cannot activate multiple GUI eventloops") |
@@ -15,20 +15,26 b' Things to do:' | |||
|
15 | 15 | #----------------------------------------------------------------------------- |
|
16 | 16 | from __future__ import print_function |
|
17 | 17 | |
|
18 |
# Standard library imports |
|
|
18 | # Standard library imports | |
|
19 | 19 | import __builtin__ |
|
20 | 20 | import atexit |
|
21 | 21 | import sys |
|
22 | 22 | import time |
|
23 | 23 | import traceback |
|
24 | 24 | import logging |
|
25 | import uuid | |
|
26 | ||
|
27 | from datetime import datetime | |
|
25 | 28 | from signal import ( |
|
26 | signal, default_int_handler, SIGINT, SIG_IGN | |
|
29 | signal, getsignal, default_int_handler, SIGINT, SIG_IGN | |
|
27 | 30 | ) |
|
28 | # System library imports. | |
|
31 | ||
|
32 | # System library imports | |
|
29 | 33 | import zmq |
|
34 | from zmq.eventloop import ioloop | |
|
35 | from zmq.eventloop.zmqstream import ZMQStream | |
|
30 | 36 | |
|
31 |
# Local imports |
|
|
37 | # Local imports | |
|
32 | 38 | from IPython.core import pylabtools |
|
33 | 39 | from IPython.config.configurable import Configurable |
|
34 | 40 | from IPython.config.application import boolean_flag, catch_config_error |
@@ -42,11 +48,12 b' from IPython.utils import py3compat' | |||
|
42 | 48 | from IPython.utils.frame import extract_module_locals |
|
43 | 49 | from IPython.utils.jsonutil import json_clean |
|
44 | 50 | from IPython.utils.traitlets import ( |
|
45 | Any, Instance, Float, Dict, CaselessStrEnum | |
|
51 | Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode | |
|
46 | 52 | ) |
|
47 | 53 | |
|
48 | 54 | from entry_point import base_launch_kernel |
|
49 | 55 | from kernelapp import KernelApp, kernel_flags, kernel_aliases |
|
56 | from serialize import serialize_object, unpack_apply_message | |
|
50 | 57 | from session import Session, Message |
|
51 | 58 | from zmqshell import ZMQInteractiveShell |
|
52 | 59 | |
@@ -63,16 +70,21 b' class Kernel(Configurable):' | |||
|
63 | 70 | |
|
64 | 71 | # attribute to override with a GUI |
|
65 | 72 | eventloop = Any(None) |
|
73 | def _eventloop_changed(self, name, old, new): | |
|
74 | """schedule call to eventloop from IOLoop""" | |
|
75 | loop = ioloop.IOLoop.instance() | |
|
76 | loop.add_timeout(time.time()+0.1, self.enter_eventloop) | |
|
66 | 77 | |
|
67 | 78 | shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') |
|
68 | 79 | session = Instance(Session) |
|
69 | 80 | profile_dir = Instance('IPython.core.profiledir.ProfileDir') |
|
70 | shell_socket = Instance('zmq.Socket') | |
|
71 | iopub_socket = Instance('zmq.Socket') | |
|
72 |
|
|
|
81 | shell_streams = List() | |
|
82 | control_stream = Instance(ZMQStream) | |
|
83 | iopub_socket = Instance(zmq.Socket) | |
|
84 | stdin_socket = Instance(zmq.Socket) | |
|
73 | 85 | log = Instance(logging.Logger) |
|
74 | 86 | |
|
75 |
user_module = |
|
|
87 | user_module = Any() | |
|
76 | 88 | def _user_module_changed(self, name, old, new): |
|
77 | 89 | if self.shell is not None: |
|
78 | 90 | self.shell.user_module = new |
@@ -83,6 +95,14 b' class Kernel(Configurable):' | |||
|
83 | 95 | self.shell.user_ns = new |
|
84 | 96 | self.shell.init_user_ns() |
|
85 | 97 | |
|
98 | # identities: | |
|
99 | int_id = Integer(-1) | |
|
100 | ident = Unicode() | |
|
101 | ||
|
102 | def _ident_default(self): | |
|
103 | return unicode(uuid.uuid4()) | |
|
104 | ||
|
105 | ||
|
86 | 106 | # Private interface |
|
87 | 107 | |
|
88 | 108 | # Time to sleep after flushing the stdout/err buffers in each execute |
@@ -110,15 +130,13 b' class Kernel(Configurable):' | |||
|
110 | 130 | # by record_ports and used by connect_request. |
|
111 | 131 | _recorded_ports = Dict() |
|
112 | 132 | |
|
133 | # set of aborted msg_ids | |
|
134 | aborted = Set() | |
|
113 | 135 | |
|
114 | 136 | |
|
115 | 137 | def __init__(self, **kwargs): |
|
116 | 138 | super(Kernel, self).__init__(**kwargs) |
|
117 | 139 | |
|
118 | # Before we even start up the shell, register *first* our exit handlers | |
|
119 | # so they come before the shell's | |
|
120 | atexit.register(self._at_shutdown) | |
|
121 | ||
|
122 | 140 | # Initialize the InteractiveShell subclass |
|
123 | 141 | self.shell = ZMQInteractiveShell.instance(config=self.config, |
|
124 | 142 | profile_dir = self.profile_dir, |
@@ -127,6 +145,7 b' class Kernel(Configurable):' | |||
|
127 | 145 | ) |
|
128 | 146 | self.shell.displayhook.session = self.session |
|
129 | 147 | self.shell.displayhook.pub_socket = self.iopub_socket |
|
148 | self.shell.displayhook.topic = self._topic('pyout') | |
|
130 | 149 | self.shell.display_pub.session = self.session |
|
131 | 150 | self.shell.display_pub.pub_socket = self.iopub_socket |
|
132 | 151 | |
@@ -136,96 +155,131 b' class Kernel(Configurable):' | |||
|
136 | 155 | # Build dict of handlers for message types |
|
137 | 156 | msg_types = [ 'execute_request', 'complete_request', |
|
138 | 157 | 'object_info_request', 'history_request', |
|
139 |
'connect_request', 'shutdown_request' |
|
|
140 | self.handlers = {} | |
|
158 | 'connect_request', 'shutdown_request', | |
|
159 | 'apply_request', | |
|
160 | ] | |
|
161 | self.shell_handlers = {} | |
|
141 | 162 | for msg_type in msg_types: |
|
142 | self.handlers[msg_type] = getattr(self, msg_type) | |
|
163 | self.shell_handlers[msg_type] = getattr(self, msg_type) | |
|
143 | 164 | |
|
144 | def do_one_iteration(self): | |
|
145 | """Do one iteration of the kernel's evaluation loop. | |
|
146 | """ | |
|
165 | control_msg_types = msg_types + [ 'clear_request', 'abort_request' ] | |
|
166 | self.control_handlers = {} | |
|
167 | for msg_type in control_msg_types: | |
|
168 | self.control_handlers[msg_type] = getattr(self, msg_type) | |
|
169 | ||
|
170 | def dispatch_control(self, msg): | |
|
171 | """dispatch control requests""" | |
|
172 | idents,msg = self.session.feed_identities(msg, copy=False) | |
|
147 | 173 | try: |
|
148 | ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) | |
|
149 |
except |
|
|
150 |
self.log. |
|
|
174 | msg = self.session.unserialize(msg, content=True, copy=False) | |
|
175 | except: | |
|
176 | self.log.error("Invalid Control Message", exc_info=True) | |
|
151 | 177 | return |
|
152 | if msg is None: | |
|
178 | ||
|
179 | self.log.debug("Control received: %s", msg) | |
|
180 | ||
|
181 | header = msg['header'] | |
|
182 | msg_id = header['msg_id'] | |
|
183 | msg_type = header['msg_type'] | |
|
184 | ||
|
185 | handler = self.control_handlers.get(msg_type, None) | |
|
186 | if handler is None: | |
|
187 | self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) | |
|
188 | else: | |
|
189 | try: | |
|
190 | handler(self.control_stream, idents, msg) | |
|
191 | except Exception: | |
|
192 | self.log.error("Exception in control handler:", exc_info=True) | |
|
193 | ||
|
194 | def dispatch_shell(self, stream, msg): | |
|
195 | """dispatch shell requests""" | |
|
196 | # flush control requests first | |
|
197 | if self.control_stream: | |
|
198 | self.control_stream.flush() | |
|
199 | ||
|
200 | idents,msg = self.session.feed_identities(msg, copy=False) | |
|
201 | try: | |
|
202 | msg = self.session.unserialize(msg, content=True, copy=False) | |
|
203 | except: | |
|
204 | self.log.error("Invalid Message", exc_info=True) | |
|
153 | 205 | return |
|
154 | 206 | |
|
207 | header = msg['header'] | |
|
208 | msg_id = header['msg_id'] | |
|
155 | 209 | msg_type = msg['header']['msg_type'] |
|
156 | 210 | |
|
157 | # This assert will raise in versions of zeromq 2.0.7 and lesser. | |
|
158 | # We now require 2.0.8 or above, so we can uncomment for safety. | |
|
159 | # print(ident,msg, file=sys.__stdout__) | |
|
160 | assert ident is not None, "Missing message part." | |
|
161 | ||
|
162 | 211 | # Print some info about this message and leave a '--->' marker, so it's |
|
163 | 212 | # easier to trace visually the message chain when debugging. Each |
|
164 | 213 | # handler prints its message at the end. |
|
165 |
self.log.debug('\n*** MESSAGE TYPE:' |
|
|
166 |
self.log.debug(' Content: ' |
|
|
214 | self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) | |
|
215 | self.log.debug(' Content: %s\n --->\n ', msg['content']) | |
|
216 | ||
|
217 | if msg_id in self.aborted: | |
|
218 | self.aborted.remove(msg_id) | |
|
219 | # is it safe to assume a msg_id will not be resubmitted? | |
|
220 | reply_type = msg_type.split('_')[0] + '_reply' | |
|
221 | status = {'status' : 'aborted'} | |
|
222 | sub = {'engine' : self.ident} | |
|
223 | sub.update(status) | |
|
224 | reply_msg = self.session.send(stream, reply_type, subheader=sub, | |
|
225 | content=status, parent=msg, ident=idents) | |
|
226 | return | |
|
167 | 227 | |
|
168 | # Find and call actual handler for message | |
|
169 | handler = self.handlers.get(msg_type, None) | |
|
228 | handler = self.shell_handlers.get(msg_type, None) | |
|
170 | 229 | if handler is None: |
|
171 |
self.log.error("UNKNOWN MESSAGE TYPE:" |
|
|
230 | self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type) | |
|
172 | 231 | else: |
|
173 | handler(ident, msg) | |
|
174 | ||
|
175 | # Check whether we should exit, in case the incoming message set the | |
|
176 | # exit flag on | |
|
177 | if self.shell.exit_now: | |
|
178 | self.log.debug('\nExiting IPython kernel...') | |
|
179 | # We do a normal, clean exit, which allows any actions registered | |
|
180 | # via atexit (such as history saving) to take place. | |
|
181 | sys.exit(0) | |
|
182 | ||
|
183 | ||
|
184 | def start(self): | |
|
185 | """ Start the kernel main loop. | |
|
186 | """ | |
|
187 | # a KeyboardInterrupt (SIGINT) can occur on any python statement, so | |
|
188 | # let's ignore (SIG_IGN) them until we're in a place to handle them properly | |
|
189 | signal(SIGINT,SIG_IGN) | |
|
190 | poller = zmq.Poller() | |
|
191 | poller.register(self.shell_socket, zmq.POLLIN) | |
|
192 | # loop while self.eventloop has not been overridden | |
|
193 | while self.eventloop is None: | |
|
194 | try: | |
|
195 | # scale by extra factor of 10, because there is no | |
|
196 | # reason for this to be anything less than ~ 0.1s | |
|
197 | # since it is a real poller and will respond | |
|
198 | # to events immediately | |
|
199 | ||
|
200 | # double nested try/except, to properly catch KeyboardInterrupt | |
|
201 | # due to pyzmq Issue #130 | |
|
232 | # ensure default_int_handler during handler call | |
|
233 | sig = signal(SIGINT, default_int_handler) | |
|
202 | 234 |
|
|
203 | poller.poll(10*1000*self._poll_interval) | |
|
204 | # restore raising of KeyboardInterrupt | |
|
205 | signal(SIGINT, default_int_handler) | |
|
206 | self.do_one_iteration() | |
|
207 | except: | |
|
208 | raise | |
|
235 | handler(stream, idents, msg) | |
|
236 | except Exception: | |
|
237 | self.log.error("Exception in message handler:", exc_info=True) | |
|
209 | 238 |
|
|
210 | # prevent raising of KeyboardInterrupt | |
|
211 | signal(SIGINT,SIG_IGN) | |
|
212 | except KeyboardInterrupt: | |
|
213 | # Ctrl-C shouldn't crash the kernel | |
|
214 | io.raw_print("KeyboardInterrupt caught in kernel") | |
|
215 | # stop ignoring sigint, now that we are out of our own loop, | |
|
216 | # we don't want to prevent future code from handling it | |
|
239 | signal(SIGINT, sig) | |
|
240 | ||
|
241 | def enter_eventloop(self): | |
|
242 | """enter eventloop""" | |
|
243 | self.log.info("entering eventloop") | |
|
244 | # restore default_int_handler | |
|
217 | 245 | signal(SIGINT, default_int_handler) |
|
218 | 246 | while self.eventloop is not None: |
|
219 | 247 | try: |
|
220 | 248 | self.eventloop(self) |
|
221 | 249 | except KeyboardInterrupt: |
|
222 | 250 | # Ctrl-C shouldn't crash the kernel |
|
223 |
|
|
|
251 | self.log.error("KeyboardInterrupt caught in kernel") | |
|
224 | 252 | continue |
|
225 | 253 | else: |
|
226 | 254 | # eventloop exited cleanly, this means we should stop (right?) |
|
227 | 255 | self.eventloop = None |
|
228 | 256 | break |
|
257 | self.log.info("exiting eventloop") | |
|
258 | # if eventloop exits, IOLoop should stop | |
|
259 | ioloop.IOLoop.instance().stop() | |
|
260 | ||
|
261 | def start(self): | |
|
262 | """register dispatchers for streams""" | |
|
263 | self.shell.exit_now = False | |
|
264 | if self.control_stream: | |
|
265 | self.control_stream.on_recv(self.dispatch_control, copy=False) | |
|
266 | ||
|
267 | def make_dispatcher(stream): | |
|
268 | def dispatcher(msg): | |
|
269 | return self.dispatch_shell(stream, msg) | |
|
270 | return dispatcher | |
|
271 | ||
|
272 | for s in self.shell_streams: | |
|
273 | s.on_recv(make_dispatcher(s), copy=False) | |
|
274 | ||
|
275 | def do_one_iteration(self): | |
|
276 | """step eventloop just once""" | |
|
277 | if self.control_stream: | |
|
278 | self.control_stream.flush() | |
|
279 | for stream in self.shell_streams: | |
|
280 | # handle at most one request per iteration | |
|
281 | stream.flush(zmq.POLLIN, 1) | |
|
282 | stream.flush(zmq.POLLOUT) | |
|
229 | 283 | |
|
230 | 284 | |
|
231 | 285 | def record_ports(self, ports): |
@@ -240,18 +294,30 b' class Kernel(Configurable):' | |||
|
240 | 294 | # Kernel request handlers |
|
241 | 295 | #--------------------------------------------------------------------------- |
|
242 | 296 | |
|
297 | def _make_subheader(self): | |
|
298 | """init subheader dict, for execute/apply_reply""" | |
|
299 | return { | |
|
300 | 'dependencies_met' : True, | |
|
301 | 'engine' : self.ident, | |
|
302 | 'started': datetime.now(), | |
|
303 | } | |
|
304 | ||
|
243 | 305 | def _publish_pyin(self, code, parent, execution_count): |
|
244 | 306 | """Publish the code request on the pyin stream.""" |
|
245 | 307 | |
|
246 |
self.session.send(self.iopub_socket, u'pyin', |
|
|
247 |
u'execution_count': execution_count}, |
|
|
308 | self.session.send(self.iopub_socket, u'pyin', | |
|
309 | {u'code':code, u'execution_count': execution_count}, | |
|
310 | parent=parent, ident=self._topic('pyin') | |
|
311 | ) | |
|
248 | 312 | |
|
249 | def execute_request(self, ident, parent): | |
|
313 | def execute_request(self, stream, ident, parent): | |
|
250 | 314 | |
|
251 | 315 | self.session.send(self.iopub_socket, |
|
252 | 316 | u'status', |
|
253 | 317 | {u'execution_state':u'busy'}, |
|
254 |
parent=parent |
|
|
318 | parent=parent, | |
|
319 | ident=self._topic('status'), | |
|
320 | ) | |
|
255 | 321 | |
|
256 | 322 | try: |
|
257 | 323 | content = parent[u'content'] |
@@ -259,9 +325,11 b' class Kernel(Configurable):' | |||
|
259 | 325 | silent = content[u'silent'] |
|
260 | 326 | except: |
|
261 | 327 | self.log.error("Got bad msg: ") |
|
262 |
self.log.error( |
|
|
328 | self.log.error("%s", parent) | |
|
263 | 329 | return |
|
264 | 330 | |
|
331 | sub = self._make_subheader() | |
|
332 | ||
|
265 | 333 | shell = self.shell # we'll need this a lot here |
|
266 | 334 | |
|
267 | 335 | # Replace raw_input. Note that is not sufficient to replace |
@@ -289,14 +357,8 b' class Kernel(Configurable):' | |||
|
289 | 357 | |
|
290 | 358 | reply_content = {} |
|
291 | 359 | try: |
|
292 | if silent: | |
|
293 | # run_code uses 'exec' mode, so no displayhook will fire, and it | |
|
294 | # doesn't call logging or history manipulations. Print | |
|
295 | # statements in that code will obviously still execute. | |
|
296 | shell.run_code(code) | |
|
297 | else: | |
|
298 | 360 |
|
|
299 |
|
|
|
361 | shell.run_cell(code, store_history=not silent, silent=silent) | |
|
300 | 362 | except: |
|
301 | 363 | status = u'error' |
|
302 | 364 | # FIXME: this code right now isn't being used yet by default, |
@@ -327,9 +389,9 b' class Kernel(Configurable):' | |||
|
327 | 389 | # or not. If it did, we proceed to evaluate user_variables/expressions |
|
328 | 390 | if reply_content['status'] == 'ok': |
|
329 | 391 | reply_content[u'user_variables'] = \ |
|
330 |
shell.user_variables(content |
|
|
392 | shell.user_variables(content.get(u'user_variables', [])) | |
|
331 | 393 | reply_content[u'user_expressions'] = \ |
|
332 |
shell.user_expressions(content |
|
|
394 | shell.user_expressions(content.get(u'user_expressions', {})) | |
|
333 | 395 | else: |
|
334 | 396 | # If there was an error, don't even try to compute variables or |
|
335 | 397 | # expressions |
@@ -355,40 +417,49 b' class Kernel(Configurable):' | |||
|
355 | 417 | |
|
356 | 418 | # Send the reply. |
|
357 | 419 | reply_content = json_clean(reply_content) |
|
358 | reply_msg = self.session.send(self.shell_socket, u'execute_reply', | |
|
359 | reply_content, parent, ident=ident) | |
|
360 | self.log.debug(str(reply_msg)) | |
|
361 | 420 | |
|
362 | if reply_msg['content']['status'] == u'error': | |
|
363 | self._abort_queue() | |
|
421 | sub['status'] = reply_content['status'] | |
|
422 | if reply_content['status'] == 'error' and \ | |
|
423 | reply_content['ename'] == 'UnmetDependency': | |
|
424 | sub['dependencies_met'] = False | |
|
425 | ||
|
426 | reply_msg = self.session.send(stream, u'execute_reply', | |
|
427 | reply_content, parent, subheader=sub, | |
|
428 | ident=ident) | |
|
429 | ||
|
430 | self.log.debug("%s", reply_msg) | |
|
431 | ||
|
432 | if not silent and reply_msg['content']['status'] == u'error': | |
|
433 | self._abort_queues() | |
|
364 | 434 | |
|
365 | 435 | self.session.send(self.iopub_socket, |
|
366 | 436 | u'status', |
|
367 | 437 | {u'execution_state':u'idle'}, |
|
368 |
parent=parent |
|
|
438 | parent=parent, | |
|
439 | ident=self._topic('status')) | |
|
369 | 440 | |
|
370 | def complete_request(self, ident, parent): | |
|
441 | def complete_request(self, stream, ident, parent): | |
|
371 | 442 | txt, matches = self._complete(parent) |
|
372 | 443 | matches = {'matches' : matches, |
|
373 | 444 | 'matched_text' : txt, |
|
374 | 445 | 'status' : 'ok'} |
|
375 | 446 | matches = json_clean(matches) |
|
376 |
completion_msg = self.session.send(s |
|
|
447 | completion_msg = self.session.send(stream, 'complete_reply', | |
|
377 | 448 | matches, parent, ident) |
|
378 |
self.log.debug( |
|
|
449 | self.log.debug("%s", completion_msg) | |
|
379 | 450 | |
|
380 | def object_info_request(self, ident, parent): | |
|
451 | def object_info_request(self, stream, ident, parent): | |
|
381 | 452 | content = parent['content'] |
|
382 | 453 | object_info = self.shell.object_inspect(content['oname'], |
|
383 | 454 | detail_level = content.get('detail_level', 0) |
|
384 | 455 | ) |
|
385 | 456 | # Before we send this object over, we scrub it for JSON usage |
|
386 | 457 | oinfo = json_clean(object_info) |
|
387 |
msg = self.session.send(s |
|
|
458 | msg = self.session.send(stream, 'object_info_reply', | |
|
388 | 459 | oinfo, parent, ident) |
|
389 | self.log.debug(msg) | |
|
460 | self.log.debug("%s", msg) | |
|
390 | 461 | |
|
391 | def history_request(self, ident, parent): | |
|
462 | def history_request(self, stream, ident, parent): | |
|
392 | 463 | # We need to pull these out, as passing **kwargs doesn't work with |
|
393 | 464 | # unicode keys before Python 2.6.5. |
|
394 | 465 | hist_access_type = parent['content']['hist_access_type'] |
@@ -416,51 +487,183 b' class Kernel(Configurable):' | |||
|
416 | 487 | hist = list(hist) |
|
417 | 488 | content = {'history' : hist} |
|
418 | 489 | content = json_clean(content) |
|
419 |
msg = self.session.send(s |
|
|
490 | msg = self.session.send(stream, 'history_reply', | |
|
420 | 491 | content, parent, ident) |
|
421 | 492 | self.log.debug("Sending history reply with %i entries", len(hist)) |
|
422 | 493 | |
|
423 | def connect_request(self, ident, parent): | |
|
494 | def connect_request(self, stream, ident, parent): | |
|
424 | 495 | if self._recorded_ports is not None: |
|
425 | 496 | content = self._recorded_ports.copy() |
|
426 | 497 | else: |
|
427 | 498 | content = {} |
|
428 |
msg = self.session.send(s |
|
|
499 | msg = self.session.send(stream, 'connect_reply', | |
|
429 | 500 | content, parent, ident) |
|
430 | self.log.debug(msg) | |
|
501 | self.log.debug("%s", msg) | |
|
431 | 502 | |
|
432 | def shutdown_request(self, ident, parent): | |
|
503 | def shutdown_request(self, stream, ident, parent): | |
|
433 | 504 | self.shell.exit_now = True |
|
505 | content = dict(status='ok') | |
|
506 | content.update(parent['content']) | |
|
507 | self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) | |
|
508 | # same content, but different msg_id for broadcasting on IOPub | |
|
434 | 509 | self._shutdown_message = self.session.msg(u'shutdown_reply', |
|
435 |
|
|
|
436 |
|
|
|
510 | content, parent | |
|
511 | ) | |
|
512 | ||
|
513 | self._at_shutdown() | |
|
514 | # call sys.exit after a short delay | |
|
515 | loop = ioloop.IOLoop.instance() | |
|
516 | loop.add_timeout(time.time()+0.1, loop.stop) | |
|
517 | ||
|
518 | #--------------------------------------------------------------------------- | |
|
519 | # Engine methods | |
|
520 | #--------------------------------------------------------------------------- | |
|
521 | ||
|
522 | def apply_request(self, stream, ident, parent): | |
|
523 | try: | |
|
524 | content = parent[u'content'] | |
|
525 | bufs = parent[u'buffers'] | |
|
526 | msg_id = parent['header']['msg_id'] | |
|
527 | except: | |
|
528 | self.log.error("Got bad msg: %s", parent, exc_info=True) | |
|
529 | return | |
|
530 | ||
|
531 | # Set the parent message of the display hook and out streams. | |
|
532 | self.shell.displayhook.set_parent(parent) | |
|
533 | self.shell.display_pub.set_parent(parent) | |
|
534 | sys.stdout.set_parent(parent) | |
|
535 | sys.stderr.set_parent(parent) | |
|
536 | ||
|
537 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
|
538 | # self.iopub_socket.send(pyin_msg) | |
|
539 | # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent) | |
|
540 | sub = self._make_subheader() | |
|
541 | try: | |
|
542 | working = self.shell.user_ns | |
|
543 | ||
|
544 | prefix = "_"+str(msg_id).replace("-","")+"_" | |
|
545 | ||
|
546 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | |
|
547 | ||
|
548 | fname = getattr(f, '__name__', 'f') | |
|
549 | ||
|
550 | fname = prefix+"f" | |
|
551 | argname = prefix+"args" | |
|
552 | kwargname = prefix+"kwargs" | |
|
553 | resultname = prefix+"result" | |
|
554 | ||
|
555 | ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } | |
|
556 | # print ns | |
|
557 | working.update(ns) | |
|
558 | code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname) | |
|
559 | try: | |
|
560 | exec code in self.shell.user_global_ns, self.shell.user_ns | |
|
561 | result = working.get(resultname) | |
|
562 | finally: | |
|
563 | for key in ns.iterkeys(): | |
|
564 | working.pop(key) | |
|
565 | ||
|
566 | packed_result,buf = serialize_object(result) | |
|
567 | result_buf = [packed_result]+buf | |
|
568 | except: | |
|
569 | exc_content = self._wrap_exception('apply') | |
|
570 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
|
571 | self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent, | |
|
572 | ident=self._topic('pyerr')) | |
|
573 | reply_content = exc_content | |
|
574 | result_buf = [] | |
|
575 | ||
|
576 | if exc_content['ename'] == 'UnmetDependency': | |
|
577 | sub['dependencies_met'] = False | |
|
578 | else: | |
|
579 | reply_content = {'status' : 'ok'} | |
|
580 | ||
|
581 | # put 'ok'/'error' status in header, for scheduler introspection: | |
|
582 | sub['status'] = reply_content['status'] | |
|
583 | ||
|
584 | # flush i/o | |
|
585 | sys.stdout.flush() | |
|
586 | sys.stderr.flush() | |
|
587 | ||
|
588 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | |
|
589 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) | |
|
590 | ||
|
591 | #--------------------------------------------------------------------------- | |
|
592 | # Control messages | |
|
593 | #--------------------------------------------------------------------------- | |
|
594 | ||
|
595 | def abort_request(self, stream, ident, parent): | |
|
596 | """abort a specifig msg by id""" | |
|
597 | msg_ids = parent['content'].get('msg_ids', None) | |
|
598 | if isinstance(msg_ids, basestring): | |
|
599 | msg_ids = [msg_ids] | |
|
600 | if not msg_ids: | |
|
601 | self.abort_queues() | |
|
602 | for mid in msg_ids: | |
|
603 | self.aborted.add(str(mid)) | |
|
604 | ||
|
605 | content = dict(status='ok') | |
|
606 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | |
|
607 | parent=parent, ident=ident) | |
|
608 | self.log.debug("%s", reply_msg) | |
|
609 | ||
|
610 | def clear_request(self, stream, idents, parent): | |
|
611 | """Clear our namespace.""" | |
|
612 | self.shell.reset(False) | |
|
613 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | |
|
614 | content = dict(status='ok')) | |
|
615 | ||
|
437 | 616 | |
|
438 | 617 | #--------------------------------------------------------------------------- |
|
439 | 618 | # Protected interface |
|
440 | 619 | #--------------------------------------------------------------------------- |
|
441 | 620 | |
|
442 | def _abort_queue(self): | |
|
621 | ||
|
622 | def _wrap_exception(self, method=None): | |
|
623 | # import here, because _wrap_exception is only used in parallel, | |
|
624 | # and parallel has higher min pyzmq version | |
|
625 | from IPython.parallel.error import wrap_exception | |
|
626 | e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) | |
|
627 | content = wrap_exception(e_info) | |
|
628 | return content | |
|
629 | ||
|
630 | def _topic(self, topic): | |
|
631 | """prefixed topic for IOPub messages""" | |
|
632 | if self.int_id >= 0: | |
|
633 | base = "engine.%i" % self.int_id | |
|
634 | else: | |
|
635 | base = "kernel.%s" % self.ident | |
|
636 | ||
|
637 | return py3compat.cast_bytes("%s.%s" % (base, topic)) | |
|
638 | ||
|
639 | def _abort_queues(self): | |
|
640 | for stream in self.shell_streams: | |
|
641 | if stream: | |
|
642 | self._abort_queue(stream) | |
|
643 | ||
|
644 | def _abort_queue(self, stream): | |
|
645 | poller = zmq.Poller() | |
|
646 | poller.register(stream.socket, zmq.POLLIN) | |
|
443 | 647 | while True: |
|
444 | try: | |
|
445 | ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) | |
|
446 | except Exception: | |
|
447 | self.log.warn("Invalid Message:", exc_info=True) | |
|
448 | continue | |
|
648 | idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) | |
|
449 | 649 | if msg is None: |
|
450 |
|
|
|
451 | else: | |
|
452 | assert ident is not None, \ | |
|
453 | "Unexpected missing message part." | |
|
650 | return | |
|
454 | 651 | |
|
455 |
self.log. |
|
|
652 | self.log.info("Aborting:") | |
|
653 | self.log.info("%s", msg) | |
|
456 | 654 | msg_type = msg['header']['msg_type'] |
|
457 | 655 | reply_type = msg_type.split('_')[0] + '_reply' |
|
458 | reply_msg = self.session.send(self.shell_socket, reply_type, | |
|
459 |
|
|
|
460 | self.log.debug(reply_msg) | |
|
656 | ||
|
657 | status = {'status' : 'aborted'} | |
|
658 | sub = {'engine' : self.ident} | |
|
659 | sub.update(status) | |
|
660 | reply_msg = self.session.send(stream, reply_type, subheader=sub, | |
|
661 | content=status, parent=msg, ident=idents) | |
|
662 | self.log.debug("%s", reply_msg) | |
|
461 | 663 | # We need to wait a bit for requests to come in. This can probably |
|
462 | 664 | # be set shorter for true asynchronous clients. |
|
463 |
|
|
|
665 | poller.poll(50) | |
|
666 | ||
|
464 | 667 | |
|
465 | 668 | def _no_raw_input(self): |
|
466 | 669 | """Raise StdinNotImplentedError if active frontend doesn't support |
@@ -490,7 +693,7 b' class Kernel(Configurable):' | |||
|
490 | 693 | value = reply['content']['value'] |
|
491 | 694 | except: |
|
492 | 695 | self.log.error("Got bad raw_input reply: ") |
|
493 |
self.log.error( |
|
|
696 | self.log.error("%s", parent) | |
|
494 | 697 | value = '' |
|
495 | 698 | if value == '\x04': |
|
496 | 699 | # EOF |
@@ -545,12 +748,9 b' class Kernel(Configurable):' | |||
|
545 | 748 | """ |
|
546 | 749 | # io.rprint("Kernel at_shutdown") # dbg |
|
547 | 750 | if self._shutdown_message is not None: |
|
548 |
self.session.send(self. |
|
|
549 |
self. |
|
|
550 | self.log.debug(str(self._shutdown_message)) | |
|
551 | # A very short sleep to give zmq time to flush its message buffers | |
|
552 | # before Python truly shuts down. | |
|
553 | time.sleep(0.01) | |
|
751 | self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) | |
|
752 | self.log.debug("%s", self._shutdown_message) | |
|
753 | [ s.flush(zmq.POLLOUT) for s in self.shell_streams ] | |
|
554 | 754 | |
|
555 | 755 | #----------------------------------------------------------------------------- |
|
556 | 756 | # Aliases and Flags for the IPKernelApp |
@@ -604,8 +804,10 b' class IPKernelApp(KernelApp, InteractiveShellApp):' | |||
|
604 | 804 | |
|
605 | 805 | def init_kernel(self): |
|
606 | 806 | |
|
807 | shell_stream = ZMQStream(self.shell_socket) | |
|
808 | ||
|
607 | 809 | kernel = Kernel(config=self.config, session=self.session, |
|
608 |
shell_s |
|
|
810 | shell_streams=[shell_stream], | |
|
609 | 811 | iopub_socket=self.iopub_socket, |
|
610 | 812 | stdin_socket=self.stdin_socket, |
|
611 | 813 | log=self.log, |
@@ -687,6 +889,13 b' def embed_kernel(module=None, local_ns=None, **kwargs):' | |||
|
687 | 889 | else: |
|
688 | 890 | app = IPKernelApp.instance(**kwargs) |
|
689 | 891 | app.initialize([]) |
|
892 | # Undo unnecessary sys module mangling from init_sys_modules. | |
|
893 | # This would not be necessary if we could prevent it | |
|
894 | # in the first place by using a different InteractiveShell | |
|
895 | # subclass, as in the regular embed case. | |
|
896 | main = app.kernel.shell._orig_sys_modules_main_mod | |
|
897 | if main is not None: | |
|
898 | sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main | |
|
690 | 899 | |
|
691 | 900 | # load the calling scope if not given |
|
692 | 901 | (caller_module, caller_locals) = extract_module_locals(1) |
@@ -15,15 +15,18 b' Authors' | |||
|
15 | 15 | # Imports |
|
16 | 16 | #----------------------------------------------------------------------------- |
|
17 | 17 | |
|
18 |
# Standard library imports |
|
|
18 | # Standard library imports | |
|
19 | import atexit | |
|
19 | 20 | import json |
|
20 | 21 | import os |
|
21 | 22 | import sys |
|
23 | import signal | |
|
22 | 24 | |
|
23 |
# System library imports |
|
|
25 | # System library imports | |
|
24 | 26 | import zmq |
|
27 | from zmq.eventloop import ioloop | |
|
25 | 28 | |
|
26 |
# IPython imports |
|
|
29 | # IPython imports | |
|
27 | 30 | from IPython.core.ultratb import FormattedTB |
|
28 | 31 | from IPython.core.application import ( |
|
29 | 32 | BaseIPythonApplication, base_flags, base_aliases, catch_config_error |
@@ -82,17 +85,18 b' kernel_flags.update(session_flags)' | |||
|
82 | 85 | #----------------------------------------------------------------------------- |
|
83 | 86 | |
|
84 | 87 | class KernelApp(BaseIPythonApplication): |
|
85 |
name='p |
|
|
88 | name='ipkernel' | |
|
86 | 89 | aliases = Dict(kernel_aliases) |
|
87 | 90 | flags = Dict(kernel_flags) |
|
88 | 91 | classes = [Session] |
|
89 | 92 | # the kernel class, as an importstring |
|
90 |
kernel_class = DottedObjectName('IPython.zmq.p |
|
|
93 | kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel') | |
|
91 | 94 | kernel = Any() |
|
92 | 95 | poller = Any() # don't restrict this even though current pollers are all Threads |
|
93 | 96 | heartbeat = Instance(Heartbeat) |
|
94 | 97 | session = Instance('IPython.zmq.session.Session') |
|
95 | 98 | ports = Dict() |
|
99 | _full_connection_file = Unicode() | |
|
96 | 100 | |
|
97 | 101 | # inherit config file name from parent: |
|
98 | 102 | parent_appname = Unicode(config=True) |
@@ -163,6 +167,8 b' class KernelApp(BaseIPythonApplication):' | |||
|
163 | 167 | fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir]) |
|
164 | 168 | except IOError: |
|
165 | 169 | self.log.debug("Connection file not found: %s", self.connection_file) |
|
170 | # This means I own it, so I will clean it up: | |
|
171 | atexit.register(self.cleanup_connection_file) | |
|
166 | 172 | return |
|
167 | 173 | self.log.debug(u"Loading connection file %s", fname) |
|
168 | 174 | with open(fname) as f: |
@@ -189,6 +195,16 b' class KernelApp(BaseIPythonApplication):' | |||
|
189 | 195 | shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port, |
|
190 | 196 | iopub_port=self.iopub_port) |
|
191 | 197 | |
|
198 | self._full_connection_file = cf | |
|
199 | ||
|
200 | def cleanup_connection_file(self): | |
|
201 | cf = self._full_connection_file | |
|
202 | self.log.debug("cleaning up connection file: %r", cf) | |
|
203 | try: | |
|
204 | os.remove(cf) | |
|
205 | except (IOError, OSError): | |
|
206 | pass | |
|
207 | ||
|
192 | 208 | def init_connection_file(self): |
|
193 | 209 | if not self.connection_file: |
|
194 | 210 | self.connection_file = "kernel-%s.json"%os.getpid() |
@@ -217,17 +233,22 b' class KernelApp(BaseIPythonApplication):' | |||
|
217 | 233 | self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) |
|
218 | 234 | self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port) |
|
219 | 235 |
|
|
236 | def init_heartbeat(self): | |
|
237 | """start the heart beating""" | |
|
220 | 238 | # heartbeat doesn't share context, because it mustn't be blocked |
|
221 | 239 | # by the GIL, which is accessed by libzmq when freeing zero-copy messages |
|
222 | 240 | hb_ctx = zmq.Context() |
|
223 | 241 | self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port)) |
|
224 | 242 | self.hb_port = self.heartbeat.port |
|
225 | 243 | self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port) |
|
244 | self.heartbeat.start() | |
|
226 | 245 | |
|
227 | 246 | # Helper to make it easier to connect to an existing kernel. |
|
228 | 247 | # set log-level to critical, to make sure it is output |
|
229 | 248 | self.log.critical("To connect another client to this kernel, use:") |
|
230 | 249 |
|
|
250 | def log_connection_info(self): | |
|
251 | """display connection info, and store ports""" | |
|
231 | 252 | basename = os.path.basename(self.connection_file) |
|
232 | 253 | if basename == self.connection_file or \ |
|
233 | 254 | os.path.dirname(self.connection_file) == self.profile_dir.security_dir: |
@@ -267,6 +288,9 b' class KernelApp(BaseIPythonApplication):' | |||
|
267 | 288 | displayhook_factory = import_item(str(self.displayhook_class)) |
|
268 | 289 | sys.displayhook = displayhook_factory(self.session, self.iopub_socket) |
|
269 | 290 | |
|
291 | def init_signal(self): | |
|
292 | signal.signal(signal.SIGINT, signal.SIG_IGN) | |
|
293 | ||
|
270 | 294 | def init_kernel(self): |
|
271 | 295 | """Create the Kernel object itself""" |
|
272 | 296 | kernel_factory = import_item(str(self.kernel_class)) |
@@ -286,9 +310,12 b' class KernelApp(BaseIPythonApplication):' | |||
|
286 | 310 | self.init_session() |
|
287 | 311 | self.init_poller() |
|
288 | 312 | self.init_sockets() |
|
289 | # writing connection file must be *after* init_sockets | |
|
313 | self.init_heartbeat() | |
|
314 | # writing/displaying connection info must be *after* init_sockets/heartbeat | |
|
315 | self.log_connection_info() | |
|
290 | 316 | self.write_connection_file() |
|
291 | 317 | self.init_io() |
|
318 | self.init_signal() | |
|
292 | 319 | self.init_kernel() |
|
293 | 320 | # flush stdout/stderr, so that anything written to these streams during |
|
294 | 321 | # initialization do not get associated with the first execution request |
@@ -296,11 +323,11 b' class KernelApp(BaseIPythonApplication):' | |||
|
296 | 323 | sys.stderr.flush() |
|
297 | 324 | |
|
298 | 325 | def start(self): |
|
299 | self.heartbeat.start() | |
|
300 | 326 | if self.poller is not None: |
|
301 | 327 | self.poller.start() |
|
302 | try: | |
|
303 | 328 |
|
|
329 | try: | |
|
330 | ioloop.IOLoop.instance().start() | |
|
304 | 331 | except KeyboardInterrupt: |
|
305 | 332 | pass |
|
306 | 333 |
@@ -781,9 +781,6 b' class KernelManager(HasTraits):' | |||
|
781 | 781 | |
|
782 | 782 | Parameters: |
|
783 | 783 | ----------- |
|
784 | ipython : bool, optional (default True) | |
|
785 | Whether to use an IPython kernel instead of a plain Python kernel. | |
|
786 | ||
|
787 | 784 | launcher : callable, optional (default None) |
|
788 | 785 | A custom function for launching the kernel process (generally a |
|
789 | 786 | wrapper around ``entry_point.base_launch_kernel``). In most cases, |
@@ -805,10 +802,7 b' class KernelManager(HasTraits):' | |||
|
805 | 802 | self._launch_args = kw.copy() |
|
806 | 803 | launch_kernel = kw.pop('launcher', None) |
|
807 | 804 | if launch_kernel is None: |
|
808 | if kw.pop('ipython', True): | |
|
809 | 805 |
|
|
810 | else: | |
|
811 | from pykernel import launch_kernel | |
|
812 | 806 | self.kernel = launch_kernel(fname=self.connection_file, **kw) |
|
813 | 807 | |
|
814 | 808 | def shutdown_kernel(self, restart=False): |
@@ -629,7 +629,7 b' class Session(Configurable):' | |||
|
629 | 629 | if isinstance(socket, ZMQStream): |
|
630 | 630 | socket = socket.socket |
|
631 | 631 | try: |
|
632 | msg_list = socket.recv_multipart(mode) | |
|
632 | msg_list = socket.recv_multipart(mode, copy=copy) | |
|
633 | 633 | except zmq.ZMQError as e: |
|
634 | 634 | if e.errno == zmq.EAGAIN: |
|
635 | 635 | # We can convert EAGAIN to None as we know in this case |
@@ -23,7 +23,7 b' from subprocess import Popen, PIPE' | |||
|
23 | 23 | import nose.tools as nt |
|
24 | 24 | |
|
25 | 25 | from IPython.zmq.blockingkernelmanager import BlockingKernelManager |
|
26 | from IPython.utils import path | |
|
26 | from IPython.utils import path, py3compat | |
|
27 | 27 | |
|
28 | 28 | |
|
29 | 29 | #------------------------------------------------------------------------------- |
@@ -68,17 +68,19 b' def setup_kernel(cmd):' | |||
|
68 | 68 | ) |
|
69 | 69 | # wait for connection file to exist, timeout after 5s |
|
70 | 70 | tic = time.time() |
|
71 |
while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + |
|
|
71 | while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10: | |
|
72 | 72 | time.sleep(0.1) |
|
73 | 73 | |
|
74 | if kernel.poll() is not None: | |
|
75 | o,e = kernel.communicate() | |
|
76 | e = py3compat.cast_unicode(e) | |
|
77 | raise IOError("Kernel failed to start:\n%s" % e) | |
|
78 | ||
|
74 | 79 | if not os.path.exists(connection_file): |
|
75 | 80 | if kernel.poll() is None: |
|
76 | 81 | kernel.terminate() |
|
77 | 82 | raise IOError("Connection file %r never arrived" % connection_file) |
|
78 | 83 | |
|
79 | if kernel.poll() is not None: | |
|
80 | raise IOError("Kernel failed to start") | |
|
81 | ||
|
82 | 84 | km = BlockingKernelManager(connection_file=connection_file) |
|
83 | 85 | km.load_connection_file() |
|
84 | 86 | km.start_channels() |
@@ -157,3 +159,33 b' def test_embed_kernel_namespace():' | |||
|
157 | 159 | content = msg['content'] |
|
158 | 160 | nt.assert_false(content['found']) |
|
159 | 161 | |
|
162 | def test_embed_kernel_reentrant(): | |
|
163 | """IPython.embed_kernel() can be called multiple times""" | |
|
164 | cmd = '\n'.join([ | |
|
165 | 'from IPython import embed_kernel', | |
|
166 | 'count = 0', | |
|
167 | 'def go():', | |
|
168 | ' global count', | |
|
169 | ' embed_kernel()', | |
|
170 | ' count = count + 1', | |
|
171 | '', | |
|
172 | 'while True:' | |
|
173 | ' go()', | |
|
174 | '', | |
|
175 | ]) | |
|
176 | ||
|
177 | with setup_kernel(cmd) as km: | |
|
178 | shell = km.shell_channel | |
|
179 | for i in range(5): | |
|
180 | msg_id = shell.object_info('count') | |
|
181 | msg = shell.get_msg(block=True, timeout=2) | |
|
182 | content = msg['content'] | |
|
183 | nt.assert_true(content['found']) | |
|
184 | nt.assert_equals(content['string_form'], unicode(i)) | |
|
185 | ||
|
186 | # exit from embed_kernel | |
|
187 | shell.execute("get_ipython().exit_now = True") | |
|
188 | msg = shell.get_msg(block=True, timeout=2) | |
|
189 | time.sleep(0.2) | |
|
190 | ||
|
191 |
@@ -19,8 +19,12 b' from __future__ import print_function' | |||
|
19 | 19 | import inspect |
|
20 | 20 | import os |
|
21 | 21 | import sys |
|
22 | import time | |
|
22 | 23 | from subprocess import Popen, PIPE |
|
23 | 24 | |
|
25 | # System library imports | |
|
26 | from zmq.eventloop import ioloop | |
|
27 | ||
|
24 | 28 | # Our own |
|
25 | 29 | from IPython.core.interactiveshell import ( |
|
26 | 30 | InteractiveShell, InteractiveShellABC |
@@ -39,7 +43,7 b' from IPython.utils import io' | |||
|
39 | 43 | from IPython.utils.jsonutil import json_clean |
|
40 | 44 | from IPython.utils.path import get_py_filename |
|
41 | 45 | from IPython.utils.process import arg_split |
|
42 | from IPython.utils.traitlets import Instance, Type, Dict, CBool | |
|
46 | from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes | |
|
43 | 47 | from IPython.utils.warn import warn, error |
|
44 | 48 | from IPython.zmq.displayhook import ZMQShellDisplayHook, _encode_binary |
|
45 | 49 | from IPython.zmq.session import extract_header |
@@ -56,6 +60,7 b' class ZMQDisplayPublisher(DisplayPublisher):' | |||
|
56 | 60 | session = Instance(Session) |
|
57 | 61 | pub_socket = Instance('zmq.Socket') |
|
58 | 62 | parent_header = Dict({}) |
|
63 | topic = CBytes(b'displaypub') | |
|
59 | 64 | |
|
60 | 65 | def set_parent(self, parent): |
|
61 | 66 | """Set the parent for outbound messages.""" |
@@ -78,7 +83,7 b' class ZMQDisplayPublisher(DisplayPublisher):' | |||
|
78 | 83 | content['metadata'] = metadata |
|
79 | 84 | self.session.send( |
|
80 | 85 | self.pub_socket, u'display_data', json_clean(content), |
|
81 | parent=self.parent_header | |
|
86 | parent=self.parent_header, ident=self.topic, | |
|
82 | 87 | ) |
|
83 | 88 | |
|
84 | 89 | def clear_output(self, stdout=True, stderr=True, other=True): |
@@ -93,7 +98,7 b' class ZMQDisplayPublisher(DisplayPublisher):' | |||
|
93 | 98 | |
|
94 | 99 | self.session.send( |
|
95 | 100 | self.pub_socket, u'clear_output', content, |
|
96 | parent=self.parent_header | |
|
101 | parent=self.parent_header, ident=self.topic, | |
|
97 | 102 | ) |
|
98 | 103 | |
|
99 | 104 | class ZMQInteractiveShell(InteractiveShell): |
@@ -115,6 +120,12 b' class ZMQInteractiveShell(InteractiveShell):' | |||
|
115 | 120 | def _exiter_default(self): |
|
116 | 121 | return ZMQExitAutocall(self) |
|
117 | 122 | |
|
123 | def _exit_now_changed(self, name, old, new): | |
|
124 | """stop eventloop when exit_now fires""" | |
|
125 | if new: | |
|
126 | loop = ioloop.IOLoop.instance() | |
|
127 | loop.add_timeout(time.time()+0.1, loop.stop) | |
|
128 | ||
|
118 | 129 | keepkernel_on_exit = None |
|
119 | 130 | |
|
120 | 131 | # Over ZeroMQ, GUI control isn't done with PyOS_InputHook as there is no |
@@ -154,6 +165,7 b' class ZMQInteractiveShell(InteractiveShell):' | |||
|
154 | 165 | |
|
155 | 166 | def ask_exit(self): |
|
156 | 167 | """Engage the exit actions.""" |
|
168 | self.exit_now = True | |
|
157 | 169 | payload = dict( |
|
158 | 170 | source='IPython.zmq.zmqshell.ZMQInteractiveShell.ask_exit', |
|
159 | 171 | exit=True, |
@@ -172,7 +184,11 b' class ZMQInteractiveShell(InteractiveShell):' | |||
|
172 | 184 | dh = self.displayhook |
|
173 | 185 | # Send exception info over pub socket for other clients than the caller |
|
174 | 186 | # to pick up |
|
175 | exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header) | |
|
187 | topic = None | |
|
188 | if dh.topic: | |
|
189 | topic = dh.topic.replace(b'pyout', b'pyerr') | |
|
190 | ||
|
191 | exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header, ident=topic) | |
|
176 | 192 | |
|
177 | 193 | # FIXME - Hack: store exception info in shell object. Right now, the |
|
178 | 194 | # caller is reading this info after the fact, we need to fix this logic |
@@ -34,7 +34,7 b' TaskRecord keys:' | |||
|
34 | 34 | =============== =============== ============= |
|
35 | 35 | Key Type Description |
|
36 | 36 | =============== =============== ============= |
|
37 |
msg_id uuid( |
|
|
37 | msg_id uuid(ascii) The msg ID | |
|
38 | 38 | header dict The request header |
|
39 | 39 | content dict The request content (likely empty) |
|
40 | 40 | buffers list(bytes) buffers containing serialized request objects |
@@ -43,7 +43,7 b" client_uuid uuid(bytes) IDENT of client's socket" | |||
|
43 | 43 | engine_uuid uuid(bytes) IDENT of engine's socket |
|
44 | 44 | started datetime time task began execution on engine |
|
45 | 45 | completed datetime time task finished execution (success or failure) on engine |
|
46 |
resubmitted |
|
|
46 | resubmitted uuid(ascii) msg_id of resubmitted task (if applicable) | |
|
47 | 47 | result_header dict header for result |
|
48 | 48 | result_content dict content for result |
|
49 | 49 | result_buffers list(bytes) buffers containing serialized request objects |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now