Show More
@@ -0,0 +1,179 b'' | |||||
|
1 | """serialization utilities for apply messages | |||
|
2 | ||||
|
3 | Authors: | |||
|
4 | ||||
|
5 | * Min RK | |||
|
6 | """ | |||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | # Copyright (C) 2010-2011 The IPython Development Team | |||
|
9 | # | |||
|
10 | # Distributed under the terms of the BSD License. The full license is in | |||
|
11 | # the file COPYING, distributed as part of this software. | |||
|
12 | #----------------------------------------------------------------------------- | |||
|
13 | ||||
|
14 | #----------------------------------------------------------------------------- | |||
|
15 | # Imports | |||
|
16 | #----------------------------------------------------------------------------- | |||
|
17 | ||||
|
18 | # Standard library imports | |||
|
19 | import logging | |||
|
20 | import os | |||
|
21 | import re | |||
|
22 | import socket | |||
|
23 | import sys | |||
|
24 | ||||
|
25 | try: | |||
|
26 | import cPickle | |||
|
27 | pickle = cPickle | |||
|
28 | except: | |||
|
29 | cPickle = None | |||
|
30 | import pickle | |||
|
31 | ||||
|
32 | ||||
|
33 | # IPython imports | |||
|
34 | from IPython.utils import py3compat | |||
|
35 | from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence | |||
|
36 | from IPython.utils.newserialized import serialize, unserialize | |||
|
37 | ||||
|
38 | if py3compat.PY3: | |||
|
39 | buffer = memoryview | |||
|
40 | ||||
|
41 | #----------------------------------------------------------------------------- | |||
|
42 | # Serialization Functions | |||
|
43 | #----------------------------------------------------------------------------- | |||
|
44 | ||||
|
45 | def serialize_object(obj, threshold=64e-6): | |||
|
46 | """Serialize an object into a list of sendable buffers. | |||
|
47 | ||||
|
48 | Parameters | |||
|
49 | ---------- | |||
|
50 | ||||
|
51 | obj : object | |||
|
52 | The object to be serialized | |||
|
53 | threshold : float | |||
|
54 | The threshold for not double-pickling the content. | |||
|
55 | ||||
|
56 | ||||
|
57 | Returns | |||
|
58 | ------- | |||
|
59 | ('pmd', [bufs]) : | |||
|
60 | where pmd is the pickled metadata wrapper, | |||
|
61 | bufs is a list of data buffers | |||
|
62 | """ | |||
|
63 | databuffers = [] | |||
|
64 | if isinstance(obj, (list, tuple)): | |||
|
65 | clist = canSequence(obj) | |||
|
66 | slist = map(serialize, clist) | |||
|
67 | for s in slist: | |||
|
68 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |||
|
69 | databuffers.append(s.getData()) | |||
|
70 | s.data = None | |||
|
71 | return pickle.dumps(slist,-1), databuffers | |||
|
72 | elif isinstance(obj, dict): | |||
|
73 | sobj = {} | |||
|
74 | for k in sorted(obj.iterkeys()): | |||
|
75 | s = serialize(can(obj[k])) | |||
|
76 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |||
|
77 | databuffers.append(s.getData()) | |||
|
78 | s.data = None | |||
|
79 | sobj[k] = s | |||
|
80 | return pickle.dumps(sobj,-1),databuffers | |||
|
81 | else: | |||
|
82 | s = serialize(can(obj)) | |||
|
83 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |||
|
84 | databuffers.append(s.getData()) | |||
|
85 | s.data = None | |||
|
86 | return pickle.dumps(s,-1),databuffers | |||
|
87 | ||||
|
88 | ||||
|
89 | def unserialize_object(bufs): | |||
|
90 | """reconstruct an object serialized by serialize_object from data buffers.""" | |||
|
91 | bufs = list(bufs) | |||
|
92 | sobj = pickle.loads(bufs.pop(0)) | |||
|
93 | if isinstance(sobj, (list, tuple)): | |||
|
94 | for s in sobj: | |||
|
95 | if s.data is None: | |||
|
96 | s.data = bufs.pop(0) | |||
|
97 | return uncanSequence(map(unserialize, sobj)), bufs | |||
|
98 | elif isinstance(sobj, dict): | |||
|
99 | newobj = {} | |||
|
100 | for k in sorted(sobj.iterkeys()): | |||
|
101 | s = sobj[k] | |||
|
102 | if s.data is None: | |||
|
103 | s.data = bufs.pop(0) | |||
|
104 | newobj[k] = uncan(unserialize(s)) | |||
|
105 | return newobj, bufs | |||
|
106 | else: | |||
|
107 | if sobj.data is None: | |||
|
108 | sobj.data = bufs.pop(0) | |||
|
109 | return uncan(unserialize(sobj)), bufs | |||
|
110 | ||||
|
111 | def pack_apply_message(f, args, kwargs, threshold=64e-6): | |||
|
112 | """pack up a function, args, and kwargs to be sent over the wire | |||
|
113 | as a series of buffers. Any object whose data is larger than `threshold` | |||
|
114 | will not have their data copied (currently only numpy arrays support zero-copy)""" | |||
|
115 | msg = [pickle.dumps(can(f),-1)] | |||
|
116 | databuffers = [] # for large objects | |||
|
117 | sargs, bufs = serialize_object(args,threshold) | |||
|
118 | msg.append(sargs) | |||
|
119 | databuffers.extend(bufs) | |||
|
120 | skwargs, bufs = serialize_object(kwargs,threshold) | |||
|
121 | msg.append(skwargs) | |||
|
122 | databuffers.extend(bufs) | |||
|
123 | msg.extend(databuffers) | |||
|
124 | return msg | |||
|
125 | ||||
|
126 | def unpack_apply_message(bufs, g=None, copy=True): | |||
|
127 | """unpack f,args,kwargs from buffers packed by pack_apply_message() | |||
|
128 | Returns: original f,args,kwargs""" | |||
|
129 | bufs = list(bufs) # allow us to pop | |||
|
130 | assert len(bufs) >= 3, "not enough buffers!" | |||
|
131 | if not copy: | |||
|
132 | for i in range(3): | |||
|
133 | bufs[i] = bufs[i].bytes | |||
|
134 | cf = pickle.loads(bufs.pop(0)) | |||
|
135 | sargs = list(pickle.loads(bufs.pop(0))) | |||
|
136 | skwargs = dict(pickle.loads(bufs.pop(0))) | |||
|
137 | # print sargs, skwargs | |||
|
138 | f = uncan(cf, g) | |||
|
139 | for sa in sargs: | |||
|
140 | if sa.data is None: | |||
|
141 | m = bufs.pop(0) | |||
|
142 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |||
|
143 | # always use a buffer, until memoryviews get sorted out | |||
|
144 | sa.data = buffer(m) | |||
|
145 | # disable memoryview support | |||
|
146 | # if copy: | |||
|
147 | # sa.data = buffer(m) | |||
|
148 | # else: | |||
|
149 | # sa.data = m.buffer | |||
|
150 | else: | |||
|
151 | if copy: | |||
|
152 | sa.data = m | |||
|
153 | else: | |||
|
154 | sa.data = m.bytes | |||
|
155 | ||||
|
156 | args = uncanSequence(map(unserialize, sargs), g) | |||
|
157 | kwargs = {} | |||
|
158 | for k in sorted(skwargs.iterkeys()): | |||
|
159 | sa = skwargs[k] | |||
|
160 | if sa.data is None: | |||
|
161 | m = bufs.pop(0) | |||
|
162 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |||
|
163 | # always use a buffer, until memoryviews get sorted out | |||
|
164 | sa.data = buffer(m) | |||
|
165 | # disable memoryview support | |||
|
166 | # if copy: | |||
|
167 | # sa.data = buffer(m) | |||
|
168 | # else: | |||
|
169 | # sa.data = m.buffer | |||
|
170 | else: | |||
|
171 | if copy: | |||
|
172 | sa.data = m | |||
|
173 | else: | |||
|
174 | sa.data = m.bytes | |||
|
175 | ||||
|
176 | kwargs[k] = uncan(unserialize(sa), g) | |||
|
177 | ||||
|
178 | return f,args,kwargs | |||
|
179 |
@@ -133,6 +133,30 b' class Application(SingletonConfigurable):' | |||||
133 | self.log_level = new |
|
133 | self.log_level = new | |
134 | self.log.setLevel(new) |
|
134 | self.log.setLevel(new) | |
135 |
|
135 | |||
|
136 | log_format = Unicode("[%(name)s] %(message)s", config=True, | |||
|
137 | help="The Logging format template", | |||
|
138 | ) | |||
|
139 | log = Instance(logging.Logger) | |||
|
140 | def _log_default(self): | |||
|
141 | """Start logging for this application. | |||
|
142 | ||||
|
143 | The default is to log to stdout using a StreaHandler. The log level | |||
|
144 | starts at loggin.WARN, but this can be adjusted by setting the | |||
|
145 | ``log_level`` attribute. | |||
|
146 | """ | |||
|
147 | log = logging.getLogger(self.__class__.__name__) | |||
|
148 | log.setLevel(self.log_level) | |||
|
149 | if sys.executable.endswith('pythonw.exe'): | |||
|
150 | # this should really go to a file, but file-logging is only | |||
|
151 | # hooked up in parallel applications | |||
|
152 | _log_handler = logging.StreamHandler(open(os.devnull, 'w')) | |||
|
153 | else: | |||
|
154 | _log_handler = logging.StreamHandler() | |||
|
155 | _log_formatter = logging.Formatter(self.log_format) | |||
|
156 | _log_handler.setFormatter(_log_formatter) | |||
|
157 | log.addHandler(_log_handler) | |||
|
158 | return log | |||
|
159 | ||||
136 | # the alias map for configurables |
|
160 | # the alias map for configurables | |
137 | aliases = Dict({'log-level' : 'Application.log_level'}) |
|
161 | aliases = Dict({'log-level' : 'Application.log_level'}) | |
138 |
|
162 | |||
@@ -169,32 +193,11 b' class Application(SingletonConfigurable):' | |||||
169 | if self.__class__ not in self.classes: |
|
193 | if self.__class__ not in self.classes: | |
170 | self.classes.insert(0, self.__class__) |
|
194 | self.classes.insert(0, self.__class__) | |
171 |
|
195 | |||
172 | self.init_logging() |
|
|||
173 |
|
||||
174 | def _config_changed(self, name, old, new): |
|
196 | def _config_changed(self, name, old, new): | |
175 | SingletonConfigurable._config_changed(self, name, old, new) |
|
197 | SingletonConfigurable._config_changed(self, name, old, new) | |
176 | self.log.debug('Config changed:') |
|
198 | self.log.debug('Config changed:') | |
177 | self.log.debug(repr(new)) |
|
199 | self.log.debug(repr(new)) | |
178 |
|
200 | |||
179 | def init_logging(self): |
|
|||
180 | """Start logging for this application. |
|
|||
181 |
|
||||
182 | The default is to log to stdout using a StreaHandler. The log level |
|
|||
183 | starts at loggin.WARN, but this can be adjusted by setting the |
|
|||
184 | ``log_level`` attribute. |
|
|||
185 | """ |
|
|||
186 | self.log = logging.getLogger(self.__class__.__name__) |
|
|||
187 | self.log.setLevel(self.log_level) |
|
|||
188 | if sys.executable.endswith('pythonw.exe'): |
|
|||
189 | # this should really go to a file, but file-logging is only |
|
|||
190 | # hooked up in parallel applications |
|
|||
191 | self._log_handler = logging.StreamHandler(open(os.devnull, 'w')) |
|
|||
192 | else: |
|
|||
193 | self._log_handler = logging.StreamHandler() |
|
|||
194 | self._log_formatter = logging.Formatter("[%(name)s] %(message)s") |
|
|||
195 | self._log_handler.setFormatter(self._log_formatter) |
|
|||
196 | self.log.addHandler(self._log_handler) |
|
|||
197 |
|
||||
198 | @catch_config_error |
|
201 | @catch_config_error | |
199 | def initialize(self, argv=None): |
|
202 | def initialize(self, argv=None): | |
200 | """Do the basic steps to configure me. |
|
203 | """Do the basic steps to configure me. |
@@ -403,6 +403,7 b' class HistoryManager(HistoryAccessor):' | |||||
403 | self.save_flag = threading.Event() |
|
403 | self.save_flag = threading.Event() | |
404 | self.db_input_cache_lock = threading.Lock() |
|
404 | self.db_input_cache_lock = threading.Lock() | |
405 | self.db_output_cache_lock = threading.Lock() |
|
405 | self.db_output_cache_lock = threading.Lock() | |
|
406 | if self.hist_file != ':memory:': | |||
406 | self.save_thread = HistorySavingThread(self) |
|
407 | self.save_thread = HistorySavingThread(self) | |
407 | self.save_thread.start() |
|
408 | self.save_thread.start() | |
408 |
|
409 |
@@ -712,6 +712,7 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||||
712 | self._orig_sys_module_state['stderr'] = sys.stderr |
|
712 | self._orig_sys_module_state['stderr'] = sys.stderr | |
713 | self._orig_sys_module_state['excepthook'] = sys.excepthook |
|
713 | self._orig_sys_module_state['excepthook'] = sys.excepthook | |
714 | self._orig_sys_modules_main_name = self.user_module.__name__ |
|
714 | self._orig_sys_modules_main_name = self.user_module.__name__ | |
|
715 | self._orig_sys_modules_main_mod = sys.modules.get(self.user_module.__name__) | |||
715 |
|
716 | |||
716 | def restore_sys_module_state(self): |
|
717 | def restore_sys_module_state(self): | |
717 | """Restore the state of the sys module.""" |
|
718 | """Restore the state of the sys module.""" | |
@@ -721,7 +722,8 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||||
721 | except AttributeError: |
|
722 | except AttributeError: | |
722 | pass |
|
723 | pass | |
723 | # Reset what what done in self.init_sys_modules |
|
724 | # Reset what what done in self.init_sys_modules | |
724 | sys.modules[self.user_module.__name__] = self._orig_sys_modules_main_name |
|
725 | if self._orig_sys_modules_main_mod is not None: | |
|
726 | sys.modules[self._orig_sys_modules_main_name] = self._orig_sys_modules_main_mod | |||
725 |
|
727 | |||
726 | #------------------------------------------------------------------------- |
|
728 | #------------------------------------------------------------------------- | |
727 | # Things related to hooks |
|
729 | # Things related to hooks | |
@@ -2424,7 +2426,7 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||||
2424 | self.showtraceback() |
|
2426 | self.showtraceback() | |
2425 | warn('Unknown failure executing module: <%s>' % mod_name) |
|
2427 | warn('Unknown failure executing module: <%s>' % mod_name) | |
2426 |
|
2428 | |||
2427 | def run_cell(self, raw_cell, store_history=False): |
|
2429 | def run_cell(self, raw_cell, store_history=False, silent=False): | |
2428 | """Run a complete IPython cell. |
|
2430 | """Run a complete IPython cell. | |
2429 |
|
2431 | |||
2430 | Parameters |
|
2432 | Parameters | |
@@ -2435,10 +2437,16 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||||
2435 | If True, the raw and translated cell will be stored in IPython's |
|
2437 | If True, the raw and translated cell will be stored in IPython's | |
2436 | history. For user code calling back into IPython's machinery, this |
|
2438 | history. For user code calling back into IPython's machinery, this | |
2437 | should be set to False. |
|
2439 | should be set to False. | |
|
2440 | silent : bool | |||
|
2441 | If True, avoid side-effets, such as implicit displayhooks, history, | |||
|
2442 | and logging. silent=True forces store_history=False. | |||
2438 | """ |
|
2443 | """ | |
2439 | if (not raw_cell) or raw_cell.isspace(): |
|
2444 | if (not raw_cell) or raw_cell.isspace(): | |
2440 | return |
|
2445 | return | |
2441 |
|
2446 | |||
|
2447 | if silent: | |||
|
2448 | store_history = False | |||
|
2449 | ||||
2442 | for line in raw_cell.splitlines(): |
|
2450 | for line in raw_cell.splitlines(): | |
2443 | self.input_splitter.push(line) |
|
2451 | self.input_splitter.push(line) | |
2444 | cell = self.input_splitter.source_reset() |
|
2452 | cell = self.input_splitter.source_reset() | |
@@ -2462,7 +2470,7 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||||
2462 | if store_history: |
|
2470 | if store_history: | |
2463 | self.history_manager.store_inputs(self.execution_count, |
|
2471 | self.history_manager.store_inputs(self.execution_count, | |
2464 | cell, raw_cell) |
|
2472 | cell, raw_cell) | |
2465 |
|
2473 | if not silent: | ||
2466 | self.logger.log(cell, raw_cell) |
|
2474 | self.logger.log(cell, raw_cell) | |
2467 |
|
2475 | |||
2468 | if not prefilter_failed: |
|
2476 | if not prefilter_failed: | |
@@ -2484,11 +2492,15 b' class InteractiveShell(SingletonConfigurable, Magic):' | |||||
2484 | self.execution_count += 1 |
|
2492 | self.execution_count += 1 | |
2485 | return None |
|
2493 | return None | |
2486 |
|
2494 | |||
|
2495 | interactivity = "none" if silent else "last_expr" | |||
2487 | self.run_ast_nodes(code_ast.body, cell_name, |
|
2496 | self.run_ast_nodes(code_ast.body, cell_name, | |
2488 |
interactivity= |
|
2497 | interactivity=interactivity) | |
2489 |
|
2498 | |||
2490 | # Execute any registered post-execution functions. |
|
2499 | # Execute any registered post-execution functions. | |
2491 | for func, status in self._post_execute.iteritems(): |
|
2500 | # unless we are silent | |
|
2501 | post_exec = [] if silent else self._post_execute.iteritems() | |||
|
2502 | ||||
|
2503 | for func, status in post_exec: | |||
2492 | if self.disable_failing_post_execute and not status: |
|
2504 | if self.disable_failing_post_execute and not status: | |
2493 | continue |
|
2505 | continue | |
2494 | try: |
|
2506 | try: |
@@ -255,6 +255,61 b' class InteractiveShellTestCase(unittest.TestCase):' | |||||
255 | # ZeroDivisionError |
|
255 | # ZeroDivisionError | |
256 | self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}") |
|
256 | self.assertEqual(ip.var_expand(u"{1/0}"), u"{1/0}") | |
257 |
|
257 | |||
|
258 | def test_silent_nopostexec(self): | |||
|
259 | """run_cell(silent=True) doesn't invoke post-exec funcs""" | |||
|
260 | ip = get_ipython() | |||
|
261 | ||||
|
262 | d = dict(called=False) | |||
|
263 | def set_called(): | |||
|
264 | d['called'] = True | |||
|
265 | ||||
|
266 | ip.register_post_execute(set_called) | |||
|
267 | ip.run_cell("1", silent=True) | |||
|
268 | self.assertFalse(d['called']) | |||
|
269 | # double-check that non-silent exec did what we expected | |||
|
270 | # silent to avoid | |||
|
271 | ip.run_cell("1") | |||
|
272 | self.assertTrue(d['called']) | |||
|
273 | # remove post-exec | |||
|
274 | ip._post_execute.pop(set_called) | |||
|
275 | ||||
|
276 | def test_silent_noadvance(self): | |||
|
277 | """run_cell(silent=True) doesn't advance execution_count""" | |||
|
278 | ip = get_ipython() | |||
|
279 | ||||
|
280 | ec = ip.execution_count | |||
|
281 | # silent should force store_history=False | |||
|
282 | ip.run_cell("1", store_history=True, silent=True) | |||
|
283 | ||||
|
284 | self.assertEquals(ec, ip.execution_count) | |||
|
285 | # double-check that non-silent exec did what we expected | |||
|
286 | # silent to avoid | |||
|
287 | ip.run_cell("1", store_history=True) | |||
|
288 | self.assertEquals(ec+1, ip.execution_count) | |||
|
289 | ||||
|
290 | def test_silent_nodisplayhook(self): | |||
|
291 | """run_cell(silent=True) doesn't trigger displayhook""" | |||
|
292 | ip = get_ipython() | |||
|
293 | ||||
|
294 | d = dict(called=False) | |||
|
295 | ||||
|
296 | trap = ip.display_trap | |||
|
297 | save_hook = trap.hook | |||
|
298 | ||||
|
299 | def failing_hook(*args, **kwargs): | |||
|
300 | d['called'] = True | |||
|
301 | ||||
|
302 | try: | |||
|
303 | trap.hook = failing_hook | |||
|
304 | ip.run_cell("1", silent=True) | |||
|
305 | self.assertFalse(d['called']) | |||
|
306 | # double-check that non-silent exec did what we expected | |||
|
307 | # silent to avoid | |||
|
308 | ip.run_cell("1") | |||
|
309 | self.assertTrue(d['called']) | |||
|
310 | finally: | |||
|
311 | trap.hook = save_hook | |||
|
312 | ||||
258 | @skipif(sys.version_info[0] >= 3, "softspace removed in py3") |
|
313 | @skipif(sys.version_info[0] >= 3, "softspace removed in py3") | |
259 | def test_print_softspace(self): |
|
314 | def test_print_softspace(self): | |
260 | """Verify that softspace is handled correctly when executing multiple |
|
315 | """Verify that softspace is handled correctly when executing multiple |
@@ -211,7 +211,7 b' class ParalleMagic(Plugin):' | |||||
211 | print '[stdout:%i]'%eid, stdout |
|
211 | print '[stdout:%i]'%eid, stdout | |
212 |
|
212 | |||
213 |
|
213 | |||
214 |
def pxrun_cell(self, raw_cell, store_history= |
|
214 | def pxrun_cell(self, raw_cell, store_history=False, silent=False): | |
215 | """drop-in replacement for InteractiveShell.run_cell. |
|
215 | """drop-in replacement for InteractiveShell.run_cell. | |
216 |
|
216 | |||
217 | This executes code remotely, instead of in the local namespace. |
|
217 | This executes code remotely, instead of in the local namespace. | |
@@ -258,7 +258,7 b' class ParalleMagic(Plugin):' | |||||
258 | return False |
|
258 | return False | |
259 | else: |
|
259 | else: | |
260 | try: |
|
260 | try: | |
261 | result = self.active_view.execute(cell, block=False) |
|
261 | result = self.active_view.execute(cell, silent=False, block=False) | |
262 | except: |
|
262 | except: | |
263 | ipself.showtraceback() |
|
263 | ipself.showtraceback() | |
264 | return True |
|
264 | return True |
@@ -140,8 +140,6 b' class IPythonConsoleApp(Configurable):' | |||||
140 | frontend_flags = Any(app_flags) |
|
140 | frontend_flags = Any(app_flags) | |
141 | frontend_aliases = Any(app_aliases) |
|
141 | frontend_aliases = Any(app_aliases) | |
142 |
|
142 | |||
143 | pure = CBool(False, config=True, |
|
|||
144 | help="Use a pure Python kernel instead of an IPython kernel.") |
|
|||
145 | # create requested profiles by default, if they don't exist: |
|
143 | # create requested profiles by default, if they don't exist: | |
146 | auto_create = CBool(True) |
|
144 | auto_create = CBool(True) | |
147 | # connection info: |
|
145 | # connection info: | |
@@ -330,9 +328,7 b' class IPythonConsoleApp(Configurable):' | |||||
330 | ) |
|
328 | ) | |
331 | # start the kernel |
|
329 | # start the kernel | |
332 | if not self.existing: |
|
330 | if not self.existing: | |
333 | kwargs = dict(ipython=not self.pure) |
|
331 | self.kernel_manager.start_kernel(extra_arguments=self.kernel_argv) | |
334 | kwargs['extra_arguments'] = self.kernel_argv |
|
|||
335 | self.kernel_manager.start_kernel(**kwargs) |
|
|||
336 | elif self.sshserver: |
|
332 | elif self.sshserver: | |
337 | # ssh, write new connection file |
|
333 | # ssh, write new connection file | |
338 | self.kernel_manager.write_connection_file() |
|
334 | self.kernel_manager.write_connection_file() |
@@ -45,8 +45,6 b' class DummyIPClusterStart(IPClusterStart):' | |||||
45 |
|
45 | |||
46 | def init_signal(self): |
|
46 | def init_signal(self): | |
47 | pass |
|
47 | pass | |
48 | def init_logging(self): |
|
|||
49 | pass |
|
|||
50 | def reinit_logging(self): |
|
48 | def reinit_logging(self): | |
51 | pass |
|
49 | pass | |
52 |
|
50 |
@@ -30,6 +30,7 b' from zmq.utils import jsonapi' | |||||
30 | from IPython.external.decorator import decorator |
|
30 | from IPython.external.decorator import decorator | |
31 | from IPython.zmq.session import Session |
|
31 | from IPython.zmq.session import Session | |
32 | from IPython.lib.security import passwd_check |
|
32 | from IPython.lib.security import passwd_check | |
|
33 | from IPython.utils.jsonutil import date_default | |||
33 |
|
34 | |||
34 | try: |
|
35 | try: | |
35 | from docutils.core import publish_string |
|
36 | from docutils.core import publish_string | |
@@ -385,13 +386,13 b' class ZMQStreamHandler(websocket.WebSocketHandler):' | |||||
385 | except KeyError: |
|
386 | except KeyError: | |
386 | pass |
|
387 | pass | |
387 | msg.pop('buffers') |
|
388 | msg.pop('buffers') | |
388 | return jsonapi.dumps(msg) |
|
389 | return jsonapi.dumps(msg, default=date_default) | |
389 |
|
390 | |||
390 | def _on_zmq_reply(self, msg_list): |
|
391 | def _on_zmq_reply(self, msg_list): | |
391 | try: |
|
392 | try: | |
392 | msg = self._reserialize_reply(msg_list) |
|
393 | msg = self._reserialize_reply(msg_list) | |
393 | except: |
|
394 | except Exception: | |
394 | self.application.log.critical("Malformed message: %r" % msg_list) |
|
395 | self.application.log.critical("Malformed message: %r" % msg_list, exc_info=True) | |
395 | else: |
|
396 | else: | |
396 | self.write_message(msg) |
|
397 | self.write_message(msg) | |
397 |
|
398 |
@@ -397,7 +397,6 b' class NotebookApp(BaseIPythonApplication):' | |||||
397 | self.cluster_manager.update_profiles() |
|
397 | self.cluster_manager.update_profiles() | |
398 |
|
398 | |||
399 | def init_logging(self): |
|
399 | def init_logging(self): | |
400 | super(NotebookApp, self).init_logging() |
|
|||
401 | # This prevents double log messages because tornado use a root logger that |
|
400 | # This prevents double log messages because tornado use a root logger that | |
402 | # self.log is a child of. The logging module dipatches log messages to a log |
|
401 | # self.log is a child of. The logging module dipatches log messages to a log | |
403 | # and all of its ancenstors until propagate is set to False. |
|
402 | # and all of its ancenstors until propagate is set to False. | |
@@ -500,6 +499,7 b' class NotebookApp(BaseIPythonApplication):' | |||||
500 |
|
499 | |||
501 | @catch_config_error |
|
500 | @catch_config_error | |
502 | def initialize(self, argv=None): |
|
501 | def initialize(self, argv=None): | |
|
502 | self.init_logging() | |||
503 | super(NotebookApp, self).initialize(argv) |
|
503 | super(NotebookApp, self).initialize(argv) | |
504 | self.init_configurables() |
|
504 | self.init_configurables() | |
505 | self.init_webapp() |
|
505 | self.init_webapp() |
@@ -101,9 +101,7 b' ipython qtconsole --pylab=inline # start with pylab in inline plotting mode' | |||||
101 | # start with copy of flags |
|
101 | # start with copy of flags | |
102 | flags = dict(flags) |
|
102 | flags = dict(flags) | |
103 | qt_flags = { |
|
103 | qt_flags = { | |
104 |
'p |
|
104 | 'plain' : ({'IPythonQtConsoleApp' : {'plain' : True}}, | |
105 | "Use a pure Python kernel instead of an IPython kernel."), |
|
|||
106 | 'plain' : ({'ConsoleWidget' : {'kind' : 'plain'}}, |
|
|||
107 | "Disable rich text support."), |
|
105 | "Disable rich text support."), | |
108 | } |
|
106 | } | |
109 | qt_flags.update(boolean_flag( |
|
107 | qt_flags.update(boolean_flag( | |
@@ -180,18 +178,14 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):' | |||||
180 | plain = CBool(False, config=True, |
|
178 | plain = CBool(False, config=True, | |
181 | help="Use a plaintext widget instead of rich text (plain can't print/save).") |
|
179 | help="Use a plaintext widget instead of rich text (plain can't print/save).") | |
182 |
|
180 | |||
183 |
def _p |
|
181 | def _plain_changed(self, name, old, new): | |
184 |
kind = 'plain' if |
|
182 | kind = 'plain' if new else 'rich' | |
185 | self.config.ConsoleWidget.kind = kind |
|
183 | self.config.ConsoleWidget.kind = kind | |
186 |
if |
|
184 | if new: | |
187 | self.widget_factory = FrontendWidget |
|
|||
188 | elif self.plain: |
|
|||
189 | self.widget_factory = IPythonWidget |
|
185 | self.widget_factory = IPythonWidget | |
190 | else: |
|
186 | else: | |
191 | self.widget_factory = RichIPythonWidget |
|
187 | self.widget_factory = RichIPythonWidget | |
192 |
|
188 | |||
193 | _plain_changed = _pure_changed |
|
|||
194 |
|
||||
195 | # the factory for creating a widget |
|
189 | # the factory for creating a widget | |
196 | widget_factory = Any(RichIPythonWidget) |
|
190 | widget_factory = Any(RichIPythonWidget) | |
197 |
|
191 | |||
@@ -210,7 +204,7 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):' | |||||
210 | config=self.config, |
|
204 | config=self.config, | |
211 | ) |
|
205 | ) | |
212 | # start the kernel |
|
206 | # start the kernel | |
213 |
kwargs = dict( |
|
207 | kwargs = dict() | |
214 | kwargs['extra_arguments'] = self.kernel_argv |
|
208 | kwargs['extra_arguments'] = self.kernel_argv | |
215 | kernel_manager.start_kernel(**kwargs) |
|
209 | kernel_manager.start_kernel(**kwargs) | |
216 | kernel_manager.start_channels() |
|
210 | kernel_manager.start_channels() | |
@@ -273,17 +267,13 b' class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp):' | |||||
273 | self.window.add_tab_with_frontend(self.widget) |
|
267 | self.window.add_tab_with_frontend(self.widget) | |
274 | self.window.init_menu_bar() |
|
268 | self.window.init_menu_bar() | |
275 |
|
269 | |||
276 |
self.window.setWindowTitle(' |
|
270 | self.window.setWindowTitle('IPython') | |
277 |
|
271 | |||
278 | def init_colors(self, widget): |
|
272 | def init_colors(self, widget): | |
279 | """Configure the coloring of the widget""" |
|
273 | """Configure the coloring of the widget""" | |
280 | # Note: This will be dramatically simplified when colors |
|
274 | # Note: This will be dramatically simplified when colors | |
281 | # are removed from the backend. |
|
275 | # are removed from the backend. | |
282 |
|
276 | |||
283 | if self.pure: |
|
|||
284 | # only IPythonWidget supports styling |
|
|||
285 | return |
|
|||
286 |
|
||||
287 | # parse the colors arg down to current known labels |
|
277 | # parse the colors arg down to current known labels | |
288 | try: |
|
278 | try: | |
289 | colors = self.config.ZMQInteractiveShell.colors |
|
279 | colors = self.config.ZMQInteractiveShell.colors |
@@ -46,11 +46,11 b' def get_connection_file(app=None):' | |||||
46 | If unspecified, the currently running app will be used |
|
46 | If unspecified, the currently running app will be used | |
47 | """ |
|
47 | """ | |
48 | if app is None: |
|
48 | if app is None: | |
49 |
from IPython.zmq.kernel |
|
49 | from IPython.zmq.ipkernel import IPKernelApp | |
50 | if not KernelApp.initialized(): |
|
50 | if not IPKernelApp.initialized(): | |
51 | raise RuntimeError("app not specified, and not in a running Kernel") |
|
51 | raise RuntimeError("app not specified, and not in a running Kernel") | |
52 |
|
52 | |||
53 | app = KernelApp.instance() |
|
53 | app = IPKernelApp.instance() | |
54 | return filefind(app.connection_file, ['.', app.profile_dir.security_dir]) |
|
54 | return filefind(app.connection_file, ['.', app.profile_dir.security_dir]) | |
55 |
|
55 | |||
56 | def find_connection_file(filename, profile=None): |
|
56 | def find_connection_file(filename, profile=None): |
@@ -35,6 +35,28 b' from .client.asyncresult import *' | |||||
35 | from .client.client import Client |
|
35 | from .client.client import Client | |
36 | from .client.remotefunction import * |
|
36 | from .client.remotefunction import * | |
37 | from .client.view import * |
|
37 | from .client.view import * | |
|
38 | from .util import interactive | |||
38 | from .controller.dependency import * |
|
39 | from .controller.dependency import * | |
39 |
|
40 | |||
|
41 | #----------------------------------------------------------------------------- | |||
|
42 | # Functions | |||
|
43 | #----------------------------------------------------------------------------- | |||
|
44 | ||||
|
45 | ||||
|
46 | def bind_kernel(**kwargs): | |||
|
47 | """Bind an Engine's Kernel to be used as a full IPython kernel. | |||
|
48 | ||||
|
49 | This allows a running Engine to be used simultaneously as a full IPython kernel | |||
|
50 | with the QtConsole or other frontends. | |||
|
51 | ||||
|
52 | This function returns immediately. | |||
|
53 | """ | |||
|
54 | from IPython.parallel.apps.ipengineapp import IPEngineApp | |||
|
55 | if IPEngineApp.initialized(): | |||
|
56 | app = IPEngineApp.instance() | |||
|
57 | else: | |||
|
58 | raise RuntimeError("Must be called from an IPEngineApp instance") | |||
|
59 | ||||
|
60 | return app.bind_kernel(**kwargs) | |||
|
61 | ||||
40 |
|
62 |
@@ -103,6 +103,10 b' class BaseParallelApplication(BaseIPythonApplication):' | |||||
103 | # temporarily override default_log_level to INFO |
|
103 | # temporarily override default_log_level to INFO | |
104 | return logging.INFO |
|
104 | return logging.INFO | |
105 |
|
105 | |||
|
106 | def _log_format_default(self): | |||
|
107 | """override default log format to include time""" | |||
|
108 | return u"%(asctime)s.%(msecs).03d [%(name)s] %(message)s" | |||
|
109 | ||||
106 | work_dir = Unicode(os.getcwdu(), config=True, |
|
110 | work_dir = Unicode(os.getcwdu(), config=True, | |
107 | help='Set the working dir for the process.' |
|
111 | help='Set the working dir for the process.' | |
108 | ) |
|
112 | ) | |
@@ -175,11 +179,14 b' class BaseParallelApplication(BaseIPythonApplication):' | |||||
175 | else: |
|
179 | else: | |
176 | open_log_file = None |
|
180 | open_log_file = None | |
177 | if open_log_file is not None: |
|
181 | if open_log_file is not None: | |
178 |
self.log. |
|
182 | while self.log.handlers: | |
|
183 | self.log.removeHandler(self.log.handlers[0]) | |||
179 | self._log_handler = logging.StreamHandler(open_log_file) |
|
184 | self._log_handler = logging.StreamHandler(open_log_file) | |
180 | self.log.addHandler(self._log_handler) |
|
185 | self.log.addHandler(self._log_handler) | |
|
186 | else: | |||
|
187 | self._log_handler = self.log.handlers[0] | |||
181 | # Add timestamps to log format: |
|
188 | # Add timestamps to log format: | |
182 |
self._log_formatter = logging.Formatter( |
|
189 | self._log_formatter = logging.Formatter(self.log_format, | |
183 | datefmt="%Y-%m-%d %H:%M:%S") |
|
190 | datefmt="%Y-%m-%d %H:%M:%S") | |
184 | self._log_handler.setFormatter(self._log_formatter) |
|
191 | self._log_handler.setFormatter(self._log_formatter) | |
185 | # do not propagate log messages to root logger |
|
192 | # do not propagate log messages to root logger |
@@ -434,11 +434,9 b' class IPControllerApp(BaseParallelApplication):' | |||||
434 | lsock = context.socket(zmq.PUB) |
|
434 | lsock = context.socket(zmq.PUB) | |
435 | lsock.connect(self.log_url) |
|
435 | lsock.connect(self.log_url) | |
436 | handler = PUBHandler(lsock) |
|
436 | handler = PUBHandler(lsock) | |
437 | self.log.removeHandler(self._log_handler) |
|
|||
438 | handler.root_topic = 'controller' |
|
437 | handler.root_topic = 'controller' | |
439 | handler.setLevel(self.log_level) |
|
438 | handler.setLevel(self.log_level) | |
440 | self.log.addHandler(handler) |
|
439 | self.log.addHandler(handler) | |
441 | self._log_handler = handler |
|
|||
442 |
|
440 | |||
443 | @catch_config_error |
|
441 | @catch_config_error | |
444 | def initialize(self, argv=None): |
|
442 | def initialize(self, argv=None): |
@@ -37,6 +37,7 b' from IPython.parallel.apps.baseapp import (' | |||||
37 | catch_config_error, |
|
37 | catch_config_error, | |
38 | ) |
|
38 | ) | |
39 | from IPython.zmq.log import EnginePUBHandler |
|
39 | from IPython.zmq.log import EnginePUBHandler | |
|
40 | from IPython.zmq.ipkernel import Kernel, IPKernelApp | |||
40 | from IPython.zmq.session import ( |
|
41 | from IPython.zmq.session import ( | |
41 | Session, session_aliases, session_flags |
|
42 | Session, session_aliases, session_flags | |
42 | ) |
|
43 | ) | |
@@ -44,11 +45,11 b' from IPython.zmq.session import (' | |||||
44 | from IPython.config.configurable import Configurable |
|
45 | from IPython.config.configurable import Configurable | |
45 |
|
46 | |||
46 | from IPython.parallel.engine.engine import EngineFactory |
|
47 | from IPython.parallel.engine.engine import EngineFactory | |
47 |
from IPython.parallel. |
|
48 | from IPython.parallel.util import disambiguate_url | |
48 | from IPython.parallel.util import disambiguate_url, asbytes |
|
|||
49 |
|
49 | |||
50 | from IPython.utils.importstring import import_item |
|
50 | from IPython.utils.importstring import import_item | |
51 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float |
|
51 | from IPython.utils.py3compat import cast_bytes | |
|
52 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance | |||
52 |
|
53 | |||
53 |
|
54 | |||
54 | #----------------------------------------------------------------------------- |
|
55 | #----------------------------------------------------------------------------- | |
@@ -174,9 +175,17 b' class IPEngineApp(BaseParallelApplication):' | |||||
174 | help="""The URL for the iploggerapp instance, for forwarding |
|
175 | help="""The URL for the iploggerapp instance, for forwarding | |
175 | logging to a central location.""") |
|
176 | logging to a central location.""") | |
176 |
|
177 | |||
|
178 | # an IPKernelApp instance, used to setup listening for shell frontends | |||
|
179 | kernel_app = Instance(IPKernelApp) | |||
|
180 | ||||
177 | aliases = Dict(aliases) |
|
181 | aliases = Dict(aliases) | |
178 | flags = Dict(flags) |
|
182 | flags = Dict(flags) | |
179 |
|
183 | |||
|
184 | @property | |||
|
185 | def kernel(self): | |||
|
186 | """allow access to the Kernel object, so I look like IPKernelApp""" | |||
|
187 | return self.engine.kernel | |||
|
188 | ||||
180 | def find_url_file(self): |
|
189 | def find_url_file(self): | |
181 | """Set the url file. |
|
190 | """Set the url file. | |
182 |
|
191 | |||
@@ -203,7 +212,7 b' class IPEngineApp(BaseParallelApplication):' | |||||
203 | d = json.loads(f.read()) |
|
212 | d = json.loads(f.read()) | |
204 |
|
213 | |||
205 | if 'exec_key' in d: |
|
214 | if 'exec_key' in d: | |
206 | config.Session.key = asbytes(d['exec_key']) |
|
215 | config.Session.key = cast_bytes(d['exec_key']) | |
207 |
|
216 | |||
208 | try: |
|
217 | try: | |
209 | config.EngineFactory.location |
|
218 | config.EngineFactory.location | |
@@ -221,6 +230,46 b' class IPEngineApp(BaseParallelApplication):' | |||||
221 | except AttributeError: |
|
230 | except AttributeError: | |
222 | config.EngineFactory.sshserver = d['ssh'] |
|
231 | config.EngineFactory.sshserver = d['ssh'] | |
223 |
|
|
232 | ||
|
233 | def bind_kernel(self, **kwargs): | |||
|
234 | """Promote engine to listening kernel, accessible to frontends.""" | |||
|
235 | if self.kernel_app is not None: | |||
|
236 | return | |||
|
237 | ||||
|
238 | self.log.info("Opening ports for direct connections as an IPython kernel") | |||
|
239 | ||||
|
240 | kernel = self.kernel | |||
|
241 | ||||
|
242 | kwargs.setdefault('config', self.config) | |||
|
243 | kwargs.setdefault('log', self.log) | |||
|
244 | kwargs.setdefault('profile_dir', self.profile_dir) | |||
|
245 | kwargs.setdefault('session', self.engine.session) | |||
|
246 | ||||
|
247 | app = self.kernel_app = IPKernelApp(**kwargs) | |||
|
248 | ||||
|
249 | # allow IPKernelApp.instance(): | |||
|
250 | IPKernelApp._instance = app | |||
|
251 | ||||
|
252 | app.init_connection_file() | |||
|
253 | # relevant contents of init_sockets: | |||
|
254 | ||||
|
255 | app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port) | |||
|
256 | app.log.debug("shell ROUTER Channel on port: %i", app.shell_port) | |||
|
257 | ||||
|
258 | app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port) | |||
|
259 | app.log.debug("iopub PUB Channel on port: %i", app.iopub_port) | |||
|
260 | ||||
|
261 | kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER) | |||
|
262 | app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port) | |||
|
263 | app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port) | |||
|
264 | ||||
|
265 | # start the heartbeat, and log connection info: | |||
|
266 | ||||
|
267 | app.init_heartbeat() | |||
|
268 | ||||
|
269 | app.log_connection_info() | |||
|
270 | app.write_connection_file() | |||
|
271 | ||||
|
272 | ||||
224 | def init_engine(self): |
|
273 | def init_engine(self): | |
225 | # This is the working dir by now. |
|
274 | # This is the working dir by now. | |
226 | sys.path.insert(0, '') |
|
275 | sys.path.insert(0, '') | |
@@ -282,11 +331,9 b' class IPEngineApp(BaseParallelApplication):' | |||||
282 | context = self.engine.context |
|
331 | context = self.engine.context | |
283 | lsock = context.socket(zmq.PUB) |
|
332 | lsock = context.socket(zmq.PUB) | |
284 | lsock.connect(self.log_url) |
|
333 | lsock.connect(self.log_url) | |
285 | self.log.removeHandler(self._log_handler) |
|
|||
286 | handler = EnginePUBHandler(self.engine, lsock) |
|
334 | handler = EnginePUBHandler(self.engine, lsock) | |
287 | handler.setLevel(self.log_level) |
|
335 | handler.setLevel(self.log_level) | |
288 | self.log.addHandler(handler) |
|
336 | self.log.addHandler(handler) | |
289 | self._log_handler = handler |
|
|||
290 |
|
337 | |||
291 | def init_mpi(self): |
|
338 | def init_mpi(self): | |
292 | global mpi |
|
339 | global mpi |
@@ -23,6 +23,7 b' import copy' | |||||
23 | import logging |
|
23 | import logging | |
24 | import os |
|
24 | import os | |
25 | import stat |
|
25 | import stat | |
|
26 | import sys | |||
26 | import time |
|
27 | import time | |
27 |
|
28 | |||
28 | # signal imports, handling various platforms, versions |
|
29 | # signal imports, handling various platforms, versions | |
@@ -59,8 +60,8 b' from IPython.utils.text import EvalFormatter' | |||||
59 | from IPython.utils.traitlets import ( |
|
60 | from IPython.utils.traitlets import ( | |
60 | Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp |
|
61 | Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp | |
61 | ) |
|
62 | ) | |
62 |
from IPython.utils.path import |
|
63 | from IPython.utils.path import get_home_dir | |
63 |
from IPython.utils.process import find_cmd, |
|
64 | from IPython.utils.process import find_cmd, FindCmdError | |
64 |
|
65 | |||
65 | from .win32support import forward_read_events |
|
66 | from .win32support import forward_read_events | |
66 |
|
67 | |||
@@ -72,18 +73,13 b" WINDOWS = os.name == 'nt'" | |||||
72 | # Paths to the kernel apps |
|
73 | # Paths to the kernel apps | |
73 | #----------------------------------------------------------------------------- |
|
74 | #----------------------------------------------------------------------------- | |
74 |
|
75 | |||
|
76 | cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()" | |||
75 |
|
77 | |||
76 | ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path( |
|
78 | ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"] | |
77 | 'IPython.parallel.apps.ipclusterapp' |
|
|||
78 | )) |
|
|||
79 |
|
79 | |||
80 | ipengine_cmd_argv = pycmd2argv(get_ipython_module_path( |
|
80 | ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"] | |
81 | 'IPython.parallel.apps.ipengineapp' |
|
|||
82 | )) |
|
|||
83 |
|
81 | |||
84 | ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path( |
|
82 | ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"] | |
85 | 'IPython.parallel.apps.ipcontrollerapp' |
|
|||
86 | )) |
|
|||
87 |
|
83 | |||
88 | #----------------------------------------------------------------------------- |
|
84 | #----------------------------------------------------------------------------- | |
89 | # Base launchers and errors |
|
85 | # Base launchers and errors |
@@ -36,6 +36,7 b' from IPython.core.application import BaseIPythonApplication' | |||||
36 | from IPython.utils.jsonutil import rekey |
|
36 | from IPython.utils.jsonutil import rekey | |
37 | from IPython.utils.localinterfaces import LOCAL_IPS |
|
37 | from IPython.utils.localinterfaces import LOCAL_IPS | |
38 | from IPython.utils.path import get_ipython_dir |
|
38 | from IPython.utils.path import get_ipython_dir | |
|
39 | from IPython.utils.py3compat import cast_bytes | |||
39 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, |
|
40 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, | |
40 | Dict, List, Bool, Set, Any) |
|
41 | Dict, List, Bool, Set, Any) | |
41 | from IPython.external.decorator import decorator |
|
42 | from IPython.external.decorator import decorator | |
@@ -71,6 +72,60 b' def spin_first(f, self, *args, **kwargs):' | |||||
71 | # Classes |
|
72 | # Classes | |
72 | #-------------------------------------------------------------------------- |
|
73 | #-------------------------------------------------------------------------- | |
73 |
|
74 | |||
|
75 | ||||
|
76 | class ExecuteReply(object): | |||
|
77 | """wrapper for finished Execute results""" | |||
|
78 | def __init__(self, msg_id, content, metadata): | |||
|
79 | self.msg_id = msg_id | |||
|
80 | self._content = content | |||
|
81 | self.execution_count = content['execution_count'] | |||
|
82 | self.metadata = metadata | |||
|
83 | ||||
|
84 | def __getitem__(self, key): | |||
|
85 | return self.metadata[key] | |||
|
86 | ||||
|
87 | def __getattr__(self, key): | |||
|
88 | if key not in self.metadata: | |||
|
89 | raise AttributeError(key) | |||
|
90 | return self.metadata[key] | |||
|
91 | ||||
|
92 | def __repr__(self): | |||
|
93 | pyout = self.metadata['pyout'] or {} | |||
|
94 | text_out = pyout.get('data', {}).get('text/plain', '') | |||
|
95 | if len(text_out) > 32: | |||
|
96 | text_out = text_out[:29] + '...' | |||
|
97 | ||||
|
98 | return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out) | |||
|
99 | ||||
|
100 | def _repr_html_(self): | |||
|
101 | pyout = self.metadata['pyout'] or {'data':{}} | |||
|
102 | return pyout['data'].get("text/html") | |||
|
103 | ||||
|
104 | def _repr_latex_(self): | |||
|
105 | pyout = self.metadata['pyout'] or {'data':{}} | |||
|
106 | return pyout['data'].get("text/latex") | |||
|
107 | ||||
|
108 | def _repr_json_(self): | |||
|
109 | pyout = self.metadata['pyout'] or {'data':{}} | |||
|
110 | return pyout['data'].get("application/json") | |||
|
111 | ||||
|
112 | def _repr_javascript_(self): | |||
|
113 | pyout = self.metadata['pyout'] or {'data':{}} | |||
|
114 | return pyout['data'].get("application/javascript") | |||
|
115 | ||||
|
116 | def _repr_png_(self): | |||
|
117 | pyout = self.metadata['pyout'] or {'data':{}} | |||
|
118 | return pyout['data'].get("image/png") | |||
|
119 | ||||
|
120 | def _repr_jpeg_(self): | |||
|
121 | pyout = self.metadata['pyout'] or {'data':{}} | |||
|
122 | return pyout['data'].get("image/jpeg") | |||
|
123 | ||||
|
124 | def _repr_svg_(self): | |||
|
125 | pyout = self.metadata['pyout'] or {'data':{}} | |||
|
126 | return pyout['data'].get("image/svg+xml") | |||
|
127 | ||||
|
128 | ||||
74 | class Metadata(dict): |
|
129 | class Metadata(dict): | |
75 | """Subclass of dict for initializing metadata values. |
|
130 | """Subclass of dict for initializing metadata values. | |
76 |
|
131 | |||
@@ -97,6 +152,7 b' class Metadata(dict):' | |||||
97 | 'pyerr' : None, |
|
152 | 'pyerr' : None, | |
98 | 'stdout' : '', |
|
153 | 'stdout' : '', | |
99 | 'stderr' : '', |
|
154 | 'stderr' : '', | |
|
155 | 'outputs' : [], | |||
100 | } |
|
156 | } | |
101 | self.update(md) |
|
157 | self.update(md) | |
102 | self.update(dict(*args, **kwargs)) |
|
158 | self.update(dict(*args, **kwargs)) | |
@@ -308,15 +364,19 b' class Client(HasTraits):' | |||||
308 | if self._cd is not None: |
|
364 | if self._cd is not None: | |
309 | if url_or_file is None: |
|
365 | if url_or_file is None: | |
310 | url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') |
|
366 | url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') | |
311 | assert url_or_file is not None, "I can't find enough information to connect to a hub!"\ |
|
367 | if url_or_file is None: | |
|
368 | raise ValueError( | |||
|
369 | "I can't find enough information to connect to a hub!" | |||
312 | " Please specify at least one of url_or_file or profile." |
|
370 | " Please specify at least one of url_or_file or profile." | |
|
371 | ) | |||
313 |
|
372 | |||
314 | if not util.is_url(url_or_file): |
|
373 | if not util.is_url(url_or_file): | |
315 | # it's not a url, try for a file |
|
374 | # it's not a url, try for a file | |
316 | if not os.path.exists(url_or_file): |
|
375 | if not os.path.exists(url_or_file): | |
317 | if self._cd: |
|
376 | if self._cd: | |
318 | url_or_file = os.path.join(self._cd.security_dir, url_or_file) |
|
377 | url_or_file = os.path.join(self._cd.security_dir, url_or_file) | |
319 | assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file |
|
378 | if not os.path.exists(url_or_file): | |
|
379 | raise IOError("Connection file not found: %r" % url_or_file) | |||
320 | with open(url_or_file) as f: |
|
380 | with open(url_or_file) as f: | |
321 | cfg = json.loads(f.read()) |
|
381 | cfg = json.loads(f.read()) | |
322 | else: |
|
382 | else: | |
@@ -369,7 +429,7 b' class Client(HasTraits):' | |||||
369 | if os.path.isfile(exec_key): |
|
429 | if os.path.isfile(exec_key): | |
370 | extra_args['keyfile'] = exec_key |
|
430 | extra_args['keyfile'] = exec_key | |
371 | else: |
|
431 | else: | |
372 |
exec_key = |
|
432 | exec_key = cast_bytes(exec_key) | |
373 | extra_args['key'] = exec_key |
|
433 | extra_args['key'] = exec_key | |
374 | self.session = Session(**extra_args) |
|
434 | self.session = Session(**extra_args) | |
375 |
|
435 | |||
@@ -467,7 +527,7 b' class Client(HasTraits):' | |||||
467 | if not isinstance(targets, (tuple, list, xrange)): |
|
527 | if not isinstance(targets, (tuple, list, xrange)): | |
468 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) |
|
528 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) | |
469 |
|
529 | |||
470 |
return [ |
|
530 | return [cast_bytes(self._engines[t]) for t in targets], list(targets) | |
471 |
|
531 | |||
472 | def _connect(self, sshserver, ssh_kwargs, timeout): |
|
532 | def _connect(self, sshserver, ssh_kwargs, timeout): | |
473 | """setup all our socket connections to the cluster. This is called from |
|
533 | """setup all our socket connections to the cluster. This is called from | |
@@ -628,7 +688,30 b' class Client(HasTraits):' | |||||
628 | print ("got unknown result: %s"%msg_id) |
|
688 | print ("got unknown result: %s"%msg_id) | |
629 | else: |
|
689 | else: | |
630 | self.outstanding.remove(msg_id) |
|
690 | self.outstanding.remove(msg_id) | |
631 | self.results[msg_id] = self._unwrap_exception(msg['content']) |
|
691 | ||
|
692 | content = msg['content'] | |||
|
693 | header = msg['header'] | |||
|
694 | ||||
|
695 | # construct metadata: | |||
|
696 | md = self.metadata[msg_id] | |||
|
697 | md.update(self._extract_metadata(header, parent, content)) | |||
|
698 | # is this redundant? | |||
|
699 | self.metadata[msg_id] = md | |||
|
700 | ||||
|
701 | e_outstanding = self._outstanding_dict[md['engine_uuid']] | |||
|
702 | if msg_id in e_outstanding: | |||
|
703 | e_outstanding.remove(msg_id) | |||
|
704 | ||||
|
705 | # construct result: | |||
|
706 | if content['status'] == 'ok': | |||
|
707 | self.results[msg_id] = ExecuteReply(msg_id, content, md) | |||
|
708 | elif content['status'] == 'aborted': | |||
|
709 | self.results[msg_id] = error.TaskAborted(msg_id) | |||
|
710 | elif content['status'] == 'resubmitted': | |||
|
711 | # TODO: handle resubmission | |||
|
712 | pass | |||
|
713 | else: | |||
|
714 | self.results[msg_id] = self._unwrap_exception(content) | |||
632 |
|
715 | |||
633 | def _handle_apply_reply(self, msg): |
|
716 | def _handle_apply_reply(self, msg): | |
634 | """Save the reply to an apply_request into our results.""" |
|
717 | """Save the reply to an apply_request into our results.""" | |
@@ -750,8 +833,13 b' class Client(HasTraits):' | |||||
750 | md.update({'pyerr' : self._unwrap_exception(content)}) |
|
833 | md.update({'pyerr' : self._unwrap_exception(content)}) | |
751 | elif msg_type == 'pyin': |
|
834 | elif msg_type == 'pyin': | |
752 | md.update({'pyin' : content['code']}) |
|
835 | md.update({'pyin' : content['code']}) | |
|
836 | elif msg_type == 'display_data': | |||
|
837 | md['outputs'].append(content) | |||
|
838 | elif msg_type == 'pyout': | |||
|
839 | md['pyout'] = content | |||
753 | else: |
|
840 | else: | |
754 | md.update({msg_type : content.get('data', '')}) |
|
841 | # unhandled msg_type (status, etc.) | |
|
842 | pass | |||
755 |
|
843 | |||
756 | # reduntant? |
|
844 | # reduntant? | |
757 | self.metadata[msg_id] = md |
|
845 | self.metadata[msg_id] = md | |
@@ -848,14 +936,14 b' class Client(HasTraits):' | |||||
848 | """ |
|
936 | """ | |
849 | if self._notification_socket: |
|
937 | if self._notification_socket: | |
850 | self._flush_notifications() |
|
938 | self._flush_notifications() | |
|
939 | if self._iopub_socket: | |||
|
940 | self._flush_iopub(self._iopub_socket) | |||
851 | if self._mux_socket: |
|
941 | if self._mux_socket: | |
852 | self._flush_results(self._mux_socket) |
|
942 | self._flush_results(self._mux_socket) | |
853 | if self._task_socket: |
|
943 | if self._task_socket: | |
854 | self._flush_results(self._task_socket) |
|
944 | self._flush_results(self._task_socket) | |
855 | if self._control_socket: |
|
945 | if self._control_socket: | |
856 | self._flush_control(self._control_socket) |
|
946 | self._flush_control(self._control_socket) | |
857 | if self._iopub_socket: |
|
|||
858 | self._flush_iopub(self._iopub_socket) |
|
|||
859 | if self._query_socket: |
|
947 | if self._query_socket: | |
860 | self._flush_ignored_hub_replies() |
|
948 | self._flush_ignored_hub_replies() | |
861 |
|
949 | |||
@@ -1024,14 +1112,16 b' class Client(HasTraits):' | |||||
1024 |
|
1112 | |||
1025 | return result |
|
1113 | return result | |
1026 |
|
1114 | |||
1027 |
def send_apply_ |
|
1115 | def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False, | |
1028 | ident=None): |
|
1116 | ident=None): | |
1029 | """construct and send an apply message via a socket. |
|
1117 | """construct and send an apply message via a socket. | |
1030 |
|
1118 | |||
1031 | This is the principal method with which all engine execution is performed by views. |
|
1119 | This is the principal method with which all engine execution is performed by views. | |
1032 | """ |
|
1120 | """ | |
1033 |
|
1121 | |||
1034 | assert not self._closed, "cannot use me anymore, I'm closed!" |
|
1122 | if self._closed: | |
|
1123 | raise RuntimeError("Client cannot be used after its sockets have been closed") | |||
|
1124 | ||||
1035 | # defaults: |
|
1125 | # defaults: | |
1036 | args = args if args is not None else [] |
|
1126 | args = args if args is not None else [] | |
1037 | kwargs = kwargs if kwargs is not None else {} |
|
1127 | kwargs = kwargs if kwargs is not None else {} | |
@@ -1066,6 +1156,43 b' class Client(HasTraits):' | |||||
1066 |
|
1156 | |||
1067 | return msg |
|
1157 | return msg | |
1068 |
|
1158 | |||
|
1159 | def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None): | |||
|
1160 | """construct and send an execute request via a socket. | |||
|
1161 | ||||
|
1162 | """ | |||
|
1163 | ||||
|
1164 | if self._closed: | |||
|
1165 | raise RuntimeError("Client cannot be used after its sockets have been closed") | |||
|
1166 | ||||
|
1167 | # defaults: | |||
|
1168 | subheader = subheader if subheader is not None else {} | |||
|
1169 | ||||
|
1170 | # validate arguments | |||
|
1171 | if not isinstance(code, basestring): | |||
|
1172 | raise TypeError("code must be text, not %s" % type(code)) | |||
|
1173 | if not isinstance(subheader, dict): | |||
|
1174 | raise TypeError("subheader must be dict, not %s" % type(subheader)) | |||
|
1175 | ||||
|
1176 | content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={}) | |||
|
1177 | ||||
|
1178 | ||||
|
1179 | msg = self.session.send(socket, "execute_request", content=content, ident=ident, | |||
|
1180 | subheader=subheader) | |||
|
1181 | ||||
|
1182 | msg_id = msg['header']['msg_id'] | |||
|
1183 | self.outstanding.add(msg_id) | |||
|
1184 | if ident: | |||
|
1185 | # possibly routed to a specific engine | |||
|
1186 | if isinstance(ident, list): | |||
|
1187 | ident = ident[-1] | |||
|
1188 | if ident in self._engines.values(): | |||
|
1189 | # save for later, in case of engine death | |||
|
1190 | self._outstanding_dict[ident].add(msg_id) | |||
|
1191 | self.history.append(msg_id) | |||
|
1192 | self.metadata[msg_id]['submitted'] = datetime.now() | |||
|
1193 | ||||
|
1194 | return msg | |||
|
1195 | ||||
1069 | #-------------------------------------------------------------------------- |
|
1196 | #-------------------------------------------------------------------------- | |
1070 | # construct a View object |
|
1197 | # construct a View object | |
1071 | #-------------------------------------------------------------------------- |
|
1198 | #-------------------------------------------------------------------------- | |
@@ -1221,12 +1348,6 b' class Client(HasTraits):' | |||||
1221 | raise TypeError("indices must be str or int, not %r"%id) |
|
1348 | raise TypeError("indices must be str or int, not %r"%id) | |
1222 | theids.append(id) |
|
1349 | theids.append(id) | |
1223 |
|
1350 | |||
1224 | for msg_id in theids: |
|
|||
1225 | self.outstanding.discard(msg_id) |
|
|||
1226 | if msg_id in self.history: |
|
|||
1227 | self.history.remove(msg_id) |
|
|||
1228 | self.results.pop(msg_id, None) |
|
|||
1229 | self.metadata.pop(msg_id, None) |
|
|||
1230 | content = dict(msg_ids = theids) |
|
1351 | content = dict(msg_ids = theids) | |
1231 |
|
1352 | |||
1232 | self.session.send(self._query_socket, 'resubmit_request', content) |
|
1353 | self.session.send(self._query_socket, 'resubmit_request', content) | |
@@ -1238,8 +1359,10 b' class Client(HasTraits):' | |||||
1238 | content = msg['content'] |
|
1359 | content = msg['content'] | |
1239 | if content['status'] != 'ok': |
|
1360 | if content['status'] != 'ok': | |
1240 | raise self._unwrap_exception(content) |
|
1361 | raise self._unwrap_exception(content) | |
|
1362 | mapping = content['resubmitted'] | |||
|
1363 | new_ids = [ mapping[msg_id] for msg_id in theids ] | |||
1241 |
|
1364 | |||
1242 |
ar = AsyncHubResult(self, msg_ids= |
|
1365 | ar = AsyncHubResult(self, msg_ids=new_ids) | |
1243 |
|
1366 | |||
1244 | if block: |
|
1367 | if block: | |
1245 | ar.wait() |
|
1368 | ar.wait() |
@@ -128,13 +128,20 b' class View(HasTraits):' | |||||
128 |
|
128 | |||
129 | assert not self.__class__ is View, "Don't use base View objects, use subclasses" |
|
129 | assert not self.__class__ is View, "Don't use base View objects, use subclasses" | |
130 |
|
130 | |||
131 |
|
||||
132 | def __repr__(self): |
|
131 | def __repr__(self): | |
133 | strtargets = str(self.targets) |
|
132 | strtargets = str(self.targets) | |
134 | if len(strtargets) > 16: |
|
133 | if len(strtargets) > 16: | |
135 | strtargets = strtargets[:12]+'...]' |
|
134 | strtargets = strtargets[:12]+'...]' | |
136 | return "<%s %s>"%(self.__class__.__name__, strtargets) |
|
135 | return "<%s %s>"%(self.__class__.__name__, strtargets) | |
137 |
|
136 | |||
|
137 | def __len__(self): | |||
|
138 | if isinstance(self.targets, list): | |||
|
139 | return len(self.targets) | |||
|
140 | elif isinstance(self.targets, int): | |||
|
141 | return 1 | |||
|
142 | else: | |||
|
143 | return len(self.client) | |||
|
144 | ||||
138 | def set_flags(self, **kwargs): |
|
145 | def set_flags(self, **kwargs): | |
139 | """set my attribute flags by keyword. |
|
146 | """set my attribute flags by keyword. | |
140 |
|
147 | |||
@@ -195,7 +202,7 b' class View(HasTraits):' | |||||
195 | @sync_results |
|
202 | @sync_results | |
196 | @save_ids |
|
203 | @save_ids | |
197 | def _really_apply(self, f, args, kwargs, block=None, **options): |
|
204 | def _really_apply(self, f, args, kwargs, block=None, **options): | |
198 |
"""wrapper for client.send_apply_ |
|
205 | """wrapper for client.send_apply_request""" | |
199 | raise NotImplementedError("Implement in subclasses") |
|
206 | raise NotImplementedError("Implement in subclasses") | |
200 |
|
207 | |||
201 | def apply(self, f, *args, **kwargs): |
|
208 | def apply(self, f, *args, **kwargs): | |
@@ -533,7 +540,7 b' class DirectView(View):' | |||||
533 | msg_ids = [] |
|
540 | msg_ids = [] | |
534 | trackers = [] |
|
541 | trackers = [] | |
535 | for ident in _idents: |
|
542 | for ident in _idents: | |
536 |
msg = self.client.send_apply_ |
|
543 | msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, | |
537 | ident=ident) |
|
544 | ident=ident) | |
538 | if track: |
|
545 | if track: | |
539 | trackers.append(msg['tracker']) |
|
546 | trackers.append(msg['tracker']) | |
@@ -547,6 +554,7 b' class DirectView(View):' | |||||
547 | pass |
|
554 | pass | |
548 | return ar |
|
555 | return ar | |
549 |
|
556 | |||
|
557 | ||||
550 | @spin_after |
|
558 | @spin_after | |
551 | def map(self, f, *sequences, **kwargs): |
|
559 | def map(self, f, *sequences, **kwargs): | |
552 | """view.map(f, *sequences, block=self.block) => list|AsyncMapResult |
|
560 | """view.map(f, *sequences, block=self.block) => list|AsyncMapResult | |
@@ -590,7 +598,9 b' class DirectView(View):' | |||||
590 | pf = ParallelFunction(self, f, block=block, **kwargs) |
|
598 | pf = ParallelFunction(self, f, block=block, **kwargs) | |
591 | return pf.map(*sequences) |
|
599 | return pf.map(*sequences) | |
592 |
|
600 | |||
593 | def execute(self, code, targets=None, block=None): |
|
601 | @sync_results | |
|
602 | @save_ids | |||
|
603 | def execute(self, code, silent=True, targets=None, block=None): | |||
594 | """Executes `code` on `targets` in blocking or nonblocking manner. |
|
604 | """Executes `code` on `targets` in blocking or nonblocking manner. | |
595 |
|
605 | |||
596 | ``execute`` is always `bound` (affects engine namespace) |
|
606 | ``execute`` is always `bound` (affects engine namespace) | |
@@ -604,7 +614,22 b' class DirectView(View):' | |||||
604 | whether or not to wait until done to return |
|
614 | whether or not to wait until done to return | |
605 | default: self.block |
|
615 | default: self.block | |
606 | """ |
|
616 | """ | |
607 | return self._really_apply(util._execute, args=(code,), block=block, targets=targets) |
|
617 | block = self.block if block is None else block | |
|
618 | targets = self.targets if targets is None else targets | |||
|
619 | ||||
|
620 | _idents = self.client._build_targets(targets)[0] | |||
|
621 | msg_ids = [] | |||
|
622 | trackers = [] | |||
|
623 | for ident in _idents: | |||
|
624 | msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident) | |||
|
625 | msg_ids.append(msg['header']['msg_id']) | |||
|
626 | ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets) | |||
|
627 | if block: | |||
|
628 | try: | |||
|
629 | ar.get() | |||
|
630 | except KeyboardInterrupt: | |||
|
631 | pass | |||
|
632 | return ar | |||
608 |
|
633 | |||
609 | def run(self, filename, targets=None, block=None): |
|
634 | def run(self, filename, targets=None, block=None): | |
610 | """Execute contents of `filename` on my engine(s). |
|
635 | """Execute contents of `filename` on my engine(s). | |
@@ -996,7 +1021,7 b' class LoadBalancedView(View):' | |||||
996 | follow = self._render_dependency(follow) |
|
1021 | follow = self._render_dependency(follow) | |
997 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) |
|
1022 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) | |
998 |
|
1023 | |||
999 |
msg = self.client.send_apply_ |
|
1024 | msg = self.client.send_apply_request(self._socket, f, args, kwargs, track=track, | |
1000 | subheader=subheader) |
|
1025 | subheader=subheader) | |
1001 | tracker = None if track is False else msg['tracker'] |
|
1026 | tracker = None if track is False else msg['tracker'] | |
1002 |
|
1027 |
@@ -23,9 +23,10 b' from zmq.devices import ThreadDevice' | |||||
23 | from zmq.eventloop import ioloop, zmqstream |
|
23 | from zmq.eventloop import ioloop, zmqstream | |
24 |
|
24 | |||
25 | from IPython.config.configurable import LoggingConfigurable |
|
25 | from IPython.config.configurable import LoggingConfigurable | |
|
26 | from IPython.utils.py3compat import str_to_bytes | |||
26 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer |
|
27 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer | |
27 |
|
28 | |||
28 |
from IPython.parallel.util import |
|
29 | from IPython.parallel.util import log_errors | |
29 |
|
30 | |||
30 | class Heart(object): |
|
31 | class Heart(object): | |
31 | """A basic heart object for responding to a HeartMonitor. |
|
32 | """A basic heart object for responding to a HeartMonitor. | |
@@ -123,7 +124,7 b' class HeartMonitor(LoggingConfigurable):' | |||||
123 | self.responses = set() |
|
124 | self.responses = set() | |
124 | # print self.on_probation, self.hearts |
|
125 | # print self.on_probation, self.hearts | |
125 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) |
|
126 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) | |
126 |
self.pingstream.send( |
|
127 | self.pingstream.send(str_to_bytes(str(self.lifetime))) | |
127 | # flush stream to force immediate socket send |
|
128 | # flush stream to force immediate socket send | |
128 | self.pingstream.flush() |
|
129 | self.pingstream.flush() | |
129 |
|
130 | |||
@@ -151,8 +152,8 b' class HeartMonitor(LoggingConfigurable):' | |||||
151 | @log_errors |
|
152 | @log_errors | |
152 | def handle_pong(self, msg): |
|
153 | def handle_pong(self, msg): | |
153 | "a heart just beat" |
|
154 | "a heart just beat" | |
154 |
current = |
|
155 | current = str_to_bytes(str(self.lifetime)) | |
155 |
last = |
|
156 | last = str_to_bytes(str(self.last_ping)) | |
156 | if msg[1] == current: |
|
157 | if msg[1] == current: | |
157 | delta = time.time()-self.tic |
|
158 | delta = time.time()-self.tic | |
158 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
|
159 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
@@ -28,6 +28,7 b' from zmq.eventloop.zmqstream import ZMQStream' | |||||
28 |
|
28 | |||
29 | # internal: |
|
29 | # internal: | |
30 | from IPython.utils.importstring import import_item |
|
30 | from IPython.utils.importstring import import_item | |
|
31 | from IPython.utils.py3compat import cast_bytes | |||
31 | from IPython.utils.traitlets import ( |
|
32 | from IPython.utils.traitlets import ( | |
32 | HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName |
|
33 | HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName | |
33 | ) |
|
34 | ) | |
@@ -441,7 +442,7 b' class Hub(SessionFactory):' | |||||
441 | for t in targets: |
|
442 | for t in targets: | |
442 | # map raw identities to ids |
|
443 | # map raw identities to ids | |
443 | if isinstance(t, (str,unicode)): |
|
444 | if isinstance(t, (str,unicode)): | |
444 | t = self.by_ident.get(t, t) |
|
445 | t = self.by_ident.get(cast_bytes(t), t) | |
445 | _targets.append(t) |
|
446 | _targets.append(t) | |
446 | targets = _targets |
|
447 | targets = _targets | |
447 | bad_targets = [ t for t in targets if t not in self.ids ] |
|
448 | bad_targets = [ t for t in targets if t not in self.ids ] | |
@@ -467,13 +468,13 b' class Hub(SessionFactory):' | |||||
467 | except ValueError: |
|
468 | except ValueError: | |
468 | idents=[] |
|
469 | idents=[] | |
469 | if not idents: |
|
470 | if not idents: | |
470 |
self.log.error(" |
|
471 | self.log.error("Monitor message without topic: %r", msg) | |
471 | return |
|
472 | return | |
472 | handler = self.monitor_handlers.get(switch, None) |
|
473 | handler = self.monitor_handlers.get(switch, None) | |
473 | if handler is not None: |
|
474 | if handler is not None: | |
474 | handler(idents, msg) |
|
475 | handler(idents, msg) | |
475 | else: |
|
476 | else: | |
476 |
self.log.error(" |
|
477 | self.log.error("Unrecognized monitor topic: %r", switch) | |
477 |
|
478 | |||
478 |
|
479 | |||
479 | @util.log_errors |
|
480 | @util.log_errors | |
@@ -719,14 +720,17 b' class Hub(SessionFactory):' | |||||
719 | self.unassigned.remove(msg_id) |
|
720 | self.unassigned.remove(msg_id) | |
720 |
|
721 | |||
721 | header = msg['header'] |
|
722 | header = msg['header'] | |
722 |
engine_uuid = header.get('engine', |
|
723 | engine_uuid = header.get('engine', u'') | |
723 | eid = self.by_ident.get(engine_uuid, None) |
|
724 | eid = self.by_ident.get(cast_bytes(engine_uuid), None) | |
|
725 | ||||
|
726 | status = header.get('status', None) | |||
724 |
|
727 | |||
725 | if msg_id in self.pending: |
|
728 | if msg_id in self.pending: | |
726 | self.log.info("task::task %r finished on %s", msg_id, eid) |
|
729 | self.log.info("task::task %r finished on %s", msg_id, eid) | |
727 | self.pending.remove(msg_id) |
|
730 | self.pending.remove(msg_id) | |
728 | self.all_completed.add(msg_id) |
|
731 | self.all_completed.add(msg_id) | |
729 | if eid is not None: |
|
732 | if eid is not None: | |
|
733 | if status != 'aborted': | |||
730 | self.completed[eid].append(msg_id) |
|
734 | self.completed[eid].append(msg_id) | |
731 | if msg_id in self.tasks[eid]: |
|
735 | if msg_id in self.tasks[eid]: | |
732 | self.tasks[eid].remove(msg_id) |
|
736 | self.tasks[eid].remove(msg_id) | |
@@ -760,7 +764,7 b' class Hub(SessionFactory):' | |||||
760 | # print (content) |
|
764 | # print (content) | |
761 | msg_id = content['msg_id'] |
|
765 | msg_id = content['msg_id'] | |
762 | engine_uuid = content['engine_id'] |
|
766 | engine_uuid = content['engine_id'] | |
763 |
eid = self.by_ident[ |
|
767 | eid = self.by_ident[cast_bytes(engine_uuid)] | |
764 |
|
768 | |||
765 | self.log.info("task::task %r arrived on %r", msg_id, eid) |
|
769 | self.log.info("task::task %r arrived on %r", msg_id, eid) | |
766 | if msg_id in self.unassigned: |
|
770 | if msg_id in self.unassigned: | |
@@ -796,7 +800,7 b' class Hub(SessionFactory):' | |||||
796 |
|
800 | |||
797 | parent = msg['parent_header'] |
|
801 | parent = msg['parent_header'] | |
798 | if not parent: |
|
802 | if not parent: | |
799 |
self.log. |
|
803 | self.log.warn("iopub::IOPub message lacks parent: %r", msg) | |
800 | return |
|
804 | return | |
801 | msg_id = parent['msg_id'] |
|
805 | msg_id = parent['msg_id'] | |
802 | msg_type = msg['header']['msg_type'] |
|
806 | msg_type = msg['header']['msg_type'] | |
@@ -850,13 +854,13 b' class Hub(SessionFactory):' | |||||
850 | """Register a new engine.""" |
|
854 | """Register a new engine.""" | |
851 | content = msg['content'] |
|
855 | content = msg['content'] | |
852 | try: |
|
856 | try: | |
853 |
queue = |
|
857 | queue = cast_bytes(content['queue']) | |
854 | except KeyError: |
|
858 | except KeyError: | |
855 | self.log.error("registration::queue not specified", exc_info=True) |
|
859 | self.log.error("registration::queue not specified", exc_info=True) | |
856 | return |
|
860 | return | |
857 | heart = content.get('heartbeat', None) |
|
861 | heart = content.get('heartbeat', None) | |
858 | if heart: |
|
862 | if heart: | |
859 |
heart = |
|
863 | heart = cast_bytes(heart) | |
860 | """register a new engine, and create the socket(s) necessary""" |
|
864 | """register a new engine, and create the socket(s) necessary""" | |
861 | eid = self._next_id |
|
865 | eid = self._next_id | |
862 | # print (eid, queue, reg, heart) |
|
866 | # print (eid, queue, reg, heart) | |
@@ -1130,7 +1134,7 b' class Hub(SessionFactory):' | |||||
1130 |
|
1134 | |||
1131 | # validate msg_ids |
|
1135 | # validate msg_ids | |
1132 | found_ids = [ rec['msg_id'] for rec in records ] |
|
1136 | found_ids = [ rec['msg_id'] for rec in records ] | |
1133 | invalid_ids = filter(lambda m: m in self.pending, found_ids) |
|
1137 | pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ] | |
1134 | if len(records) > len(msg_ids): |
|
1138 | if len(records) > len(msg_ids): | |
1135 | try: |
|
1139 | try: | |
1136 | raise RuntimeError("DB appears to be in an inconsistent state." |
|
1140 | raise RuntimeError("DB appears to be in an inconsistent state." | |
@@ -1143,40 +1147,46 b' class Hub(SessionFactory):' | |||||
1143 | raise KeyError("No such msg(s): %r" % missing) |
|
1147 | raise KeyError("No such msg(s): %r" % missing) | |
1144 | except KeyError: |
|
1148 | except KeyError: | |
1145 | return finish(error.wrap_exception()) |
|
1149 | return finish(error.wrap_exception()) | |
1146 |
elif |
|
1150 | elif pending_ids: | |
1147 | msg_id = invalid_ids[0] |
|
1151 | pass | |
1148 | try: |
|
1152 | # no need to raise on resubmit of pending task, now that we | |
1149 | raise ValueError("Task %r appears to be inflight" % msg_id) |
|
1153 | # resubmit under new ID, but do we want to raise anyway? | |
1150 | except Exception: |
|
1154 | # msg_id = invalid_ids[0] | |
1151 | return finish(error.wrap_exception()) |
|
1155 | # try: | |
|
1156 | # raise ValueError("Task(s) %r appears to be inflight" % ) | |||
|
1157 | # except Exception: | |||
|
1158 | # return finish(error.wrap_exception()) | |||
|
1159 | ||||
|
1160 | # mapping of original IDs to resubmitted IDs | |||
|
1161 | resubmitted = {} | |||
1152 |
|
1162 | |||
1153 | # clear the existing records |
|
|||
1154 | now = datetime.now() |
|
|||
1155 | rec = empty_record() |
|
|||
1156 | map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted']) |
|
|||
1157 | rec['resubmitted'] = now |
|
|||
1158 | rec['queue'] = 'task' |
|
|||
1159 | rec['client_uuid'] = client_id[0] |
|
|||
1160 | try: |
|
|||
1161 | for msg_id in msg_ids: |
|
|||
1162 | self.all_completed.discard(msg_id) |
|
|||
1163 | self.db.update_record(msg_id, rec) |
|
|||
1164 | except Exception: |
|
|||
1165 | self.log.error('db::db error upating record', exc_info=True) |
|
|||
1166 | reply = error.wrap_exception() |
|
|||
1167 | else: |
|
|||
1168 |
|
|
1163 | # send the messages | |
1169 |
|
|
1164 | for rec in records: | |
1170 |
|
|
1165 | header = rec['header'] | |
1171 | # include resubmitted in header to prevent digest collision |
|
|||
1172 | header['resubmitted'] = now |
|
|||
1173 |
|
|
1166 | msg = self.session.msg(header['msg_type']) | |
|
1167 | msg_id = msg['msg_id'] | |||
1174 |
|
|
1168 | msg['content'] = rec['content'] | |
|
1169 | header.update(msg['header']) | |||
1175 |
|
|
1170 | msg['header'] = header | |
1176 | msg['header']['msg_id'] = rec['msg_id'] |
|
1171 | ||
1177 |
|
|
1172 | self.session.send(self.resubmit, msg, buffers=rec['buffers']) | |
1178 |
|
1173 | |||
1179 | finish(dict(status='ok')) |
|
1174 | resubmitted[rec['msg_id']] = msg_id | |
|
1175 | self.pending.add(msg_id) | |||
|
1176 | msg['buffers'] = [] | |||
|
1177 | try: | |||
|
1178 | self.db.add_record(msg_id, init_record(msg)) | |||
|
1179 | except Exception: | |||
|
1180 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) | |||
|
1181 | ||||
|
1182 | finish(dict(status='ok', resubmitted=resubmitted)) | |||
|
1183 | ||||
|
1184 | # store the new IDs in the Task DB | |||
|
1185 | for msg_id, resubmit_id in resubmitted.iteritems(): | |||
|
1186 | try: | |||
|
1187 | self.db.update_record(msg_id, {'resubmitted' : resubmit_id}) | |||
|
1188 | except Exception: | |||
|
1189 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) | |||
1180 |
|
1190 | |||
1181 |
|
1191 | |||
1182 | def _extract_record(self, rec): |
|
1192 | def _extract_record(self, rec): |
@@ -42,10 +42,11 b' from IPython.external.decorator import decorator' | |||||
42 | from IPython.config.application import Application |
|
42 | from IPython.config.application import Application | |
43 | from IPython.config.loader import Config |
|
43 | from IPython.config.loader import Config | |
44 | from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes |
|
44 | from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes | |
|
45 | from IPython.utils.py3compat import cast_bytes | |||
45 |
|
46 | |||
46 | from IPython.parallel import error, util |
|
47 | from IPython.parallel import error, util | |
47 | from IPython.parallel.factory import SessionFactory |
|
48 | from IPython.parallel.factory import SessionFactory | |
48 |
from IPython.parallel.util import connect_logger, local_logger |
|
49 | from IPython.parallel.util import connect_logger, local_logger | |
49 |
|
50 | |||
50 | from .dependency import Dependency |
|
51 | from .dependency import Dependency | |
51 |
|
52 | |||
@@ -262,7 +263,7 b' class TaskScheduler(SessionFactory):' | |||||
262 | self.log.error("Unhandled message type: %r"%msg_type) |
|
263 | self.log.error("Unhandled message type: %r"%msg_type) | |
263 | else: |
|
264 | else: | |
264 | try: |
|
265 | try: | |
265 | handler(asbytes(msg['content']['queue'])) |
|
266 | handler(cast_bytes(msg['content']['queue'])) | |
266 | except Exception: |
|
267 | except Exception: | |
267 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) |
|
268 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) | |
268 |
|
269 | |||
@@ -316,7 +317,7 b' class TaskScheduler(SessionFactory):' | |||||
316 | # prevent double-handling of messages |
|
317 | # prevent double-handling of messages | |
317 | continue |
|
318 | continue | |
318 |
|
319 | |||
319 |
raw_msg = lost[msg_id] |
|
320 | raw_msg = lost[msg_id].raw_msg | |
320 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
321 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
321 | parent = self.session.unpack(msg[1].bytes) |
|
322 | parent = self.session.unpack(msg[1].bytes) | |
322 | idents = [engine, idents[0]] |
|
323 | idents = [engine, idents[0]] | |
@@ -370,7 +371,7 b' class TaskScheduler(SessionFactory):' | |||||
370 | # get targets as a set of bytes objects |
|
371 | # get targets as a set of bytes objects | |
371 | # from a list of unicode objects |
|
372 | # from a list of unicode objects | |
372 | targets = header.get('targets', []) |
|
373 | targets = header.get('targets', []) | |
373 | targets = map(asbytes, targets) |
|
374 | targets = map(cast_bytes, targets) | |
374 | targets = set(targets) |
|
375 | targets = set(targets) | |
375 |
|
376 | |||
376 | retries = header.get('retries', 0) |
|
377 | retries = header.get('retries', 0) |
@@ -138,7 +138,7 b' class SQLiteDB(BaseDB):' | |||||
138 | 'engine_uuid' : 'text', |
|
138 | 'engine_uuid' : 'text', | |
139 | 'started' : 'timestamp', |
|
139 | 'started' : 'timestamp', | |
140 | 'completed' : 'timestamp', |
|
140 | 'completed' : 'timestamp', | |
141 |
'resubmitted' : 't |
|
141 | 'resubmitted' : 'text', | |
142 | 'received' : 'timestamp', |
|
142 | 'received' : 'timestamp', | |
143 | 'result_header' : 'dict text', |
|
143 | 'result_header' : 'dict text', | |
144 | 'result_content' : 'dict text', |
|
144 | 'result_content' : 'dict text', | |
@@ -226,15 +226,16 b' class SQLiteDB(BaseDB):' | |||||
226 | # isolation_level = None)#, |
|
226 | # isolation_level = None)#, | |
227 | cached_statements=64) |
|
227 | cached_statements=64) | |
228 | # print dir(self._db) |
|
228 | # print dir(self._db) | |
229 | first_table = self.table |
|
229 | first_table = previous_table = self.table | |
230 | i=0 |
|
230 | i=0 | |
231 | while not self._check_table(): |
|
231 | while not self._check_table(): | |
232 | i+=1 |
|
232 | i+=1 | |
233 | self.table = first_table+'_%i'%i |
|
233 | self.table = first_table+'_%i'%i | |
234 | self.log.warn( |
|
234 | self.log.warn( | |
235 | "Table %s exists and doesn't match db format, trying %s"% |
|
235 | "Table %s exists and doesn't match db format, trying %s"% | |
236 |
( |
|
236 | (previous_table, self.table) | |
237 | ) |
|
237 | ) | |
|
238 | previous_table = self.table | |||
238 |
|
239 | |||
239 | self._db.execute("""CREATE TABLE IF NOT EXISTS %s |
|
240 | self._db.execute("""CREATE TABLE IF NOT EXISTS %s | |
240 | (msg_id text PRIMARY KEY, |
|
241 | (msg_id text PRIMARY KEY, | |
@@ -246,7 +247,7 b' class SQLiteDB(BaseDB):' | |||||
246 | engine_uuid text, |
|
247 | engine_uuid text, | |
247 | started timestamp, |
|
248 | started timestamp, | |
248 | completed timestamp, |
|
249 | completed timestamp, | |
249 |
resubmitted t |
|
250 | resubmitted text, | |
250 | received timestamp, |
|
251 | received timestamp, | |
251 | result_header dict text, |
|
252 | result_header dict text, | |
252 | result_content dict text, |
|
253 | result_content dict text, |
@@ -27,15 +27,14 b' from IPython.external.ssh import tunnel' | |||||
27 | from IPython.utils.traitlets import ( |
|
27 | from IPython.utils.traitlets import ( | |
28 | Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool |
|
28 | Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool | |
29 | ) |
|
29 | ) | |
30 |
from IPython.utils import |
|
30 | from IPython.utils.py3compat import cast_bytes | |
31 |
|
31 | |||
32 | from IPython.parallel.controller.heartmonitor import Heart |
|
32 | from IPython.parallel.controller.heartmonitor import Heart | |
33 | from IPython.parallel.factory import RegistrationFactory |
|
33 | from IPython.parallel.factory import RegistrationFactory | |
34 |
from IPython.parallel.util import disambiguate_url |
|
34 | from IPython.parallel.util import disambiguate_url | |
35 |
|
35 | |||
36 | from IPython.zmq.session import Message |
|
36 | from IPython.zmq.session import Message | |
37 |
|
37 | from IPython.zmq.ipkernel import Kernel | ||
38 | from .streamkernel import Kernel |
|
|||
39 |
|
38 | |||
40 | class EngineFactory(RegistrationFactory): |
|
39 | class EngineFactory(RegistrationFactory): | |
41 | """IPython engine""" |
|
40 | """IPython engine""" | |
@@ -70,7 +69,7 b' class EngineFactory(RegistrationFactory):' | |||||
70 | bident = CBytes() |
|
69 | bident = CBytes() | |
71 | ident = Unicode() |
|
70 | ident = Unicode() | |
72 | def _ident_changed(self, name, old, new): |
|
71 | def _ident_changed(self, name, old, new): | |
73 | self.bident = asbytes(new) |
|
72 | self.bident = cast_bytes(new) | |
74 | using_ssh=Bool(False) |
|
73 | using_ssh=Bool(False) | |
75 |
|
74 | |||
76 |
|
75 | |||
@@ -185,23 +184,27 b' class EngineFactory(RegistrationFactory):' | |||||
185 |
|
184 | |||
186 | # create iopub stream: |
|
185 | # create iopub stream: | |
187 | iopub_addr = msg.content.iopub |
|
186 | iopub_addr = msg.content.iopub | |
188 |
iopub_s |
|
187 | iopub_socket = ctx.socket(zmq.PUB) | |
189 |
iopub_s |
|
188 | iopub_socket.setsockopt(zmq.IDENTITY, identity) | |
190 |
connect(iopub_s |
|
189 | connect(iopub_socket, iopub_addr) | |
|
190 | ||||
|
191 | # disable history: | |||
|
192 | self.config.HistoryManager.hist_file = ':memory:' | |||
191 |
|
193 | |||
192 |
|
|
194 | # Redirect input streams and set a display hook. | |
193 | if self.out_stream_factory: |
|
195 | if self.out_stream_factory: | |
194 |
sys.stdout = self.out_stream_factory(self.session, iopub_s |
|
196 | sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout') | |
195 |
sys.stdout.topic = |
|
197 | sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id) | |
196 |
sys.stderr = self.out_stream_factory(self.session, iopub_s |
|
198 | sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr') | |
197 |
sys.stderr.topic = |
|
199 | sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id) | |
198 | if self.display_hook_factory: |
|
200 | if self.display_hook_factory: | |
199 |
sys.displayhook = self.display_hook_factory(self.session, iopub_s |
|
201 | sys.displayhook = self.display_hook_factory(self.session, iopub_socket) | |
200 |
sys.displayhook.topic = |
|
202 | sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id) | |
201 |
|
203 | |||
202 | self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, |
|
204 | self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, | |
203 |
control_stream=control_stream, shell_streams=shell_streams, iopub_s |
|
205 | control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket, | |
204 |
loop=loop, user_ns |
|
206 | loop=loop, user_ns=self.user_ns, log=self.log) | |
|
207 | self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id) | |||
205 | self.kernel.start() |
|
208 | self.kernel.start() | |
206 |
|
209 | |||
207 |
|
210 |
@@ -65,7 +65,7 b' def setup():' | |||||
65 | if cp.poll() is not None: |
|
65 | if cp.poll() is not None: | |
66 | print cp.poll() |
|
66 | print cp.poll() | |
67 | raise RuntimeError("The test controller failed to start.") |
|
67 | raise RuntimeError("The test controller failed to start.") | |
68 |
elif time.time()-tic > 1 |
|
68 | elif time.time()-tic > 15: | |
69 | raise RuntimeError("Timeout waiting for the test controller to start.") |
|
69 | raise RuntimeError("Timeout waiting for the test controller to start.") | |
70 | time.sleep(0.1) |
|
70 | time.sleep(0.1) | |
71 | add_engines(1) |
|
71 | add_engines(1) | |
@@ -93,7 +93,7 b" def add_engines(n=1, profile='iptest', total=False):" | |||||
93 | while len(rc) < base+n: |
|
93 | while len(rc) < base+n: | |
94 | if any([ ep.poll() is not None for ep in eps ]): |
|
94 | if any([ ep.poll() is not None for ep in eps ]): | |
95 | raise RuntimeError("A test engine failed to start.") |
|
95 | raise RuntimeError("A test engine failed to start.") | |
96 |
elif time.time()-tic > 1 |
|
96 | elif time.time()-tic > 15: | |
97 | raise RuntimeError("Timeout waiting for engines to connect.") |
|
97 | raise RuntimeError("Timeout waiting for engines to connect.") | |
98 | time.sleep(.1) |
|
98 | time.sleep(.1) | |
99 | rc.spin() |
|
99 | rc.spin() |
@@ -154,7 +154,7 b' class AsyncResultTest(ClusterTestCase):' | |||||
154 | ar = v.apply_async(time.sleep, 0.25) |
|
154 | ar = v.apply_async(time.sleep, 0.25) | |
155 | self.assertRaises(TimeoutError, getattr, ar, 'serial_time') |
|
155 | self.assertRaises(TimeoutError, getattr, ar, 'serial_time') | |
156 | ar.get(2) |
|
156 | ar.get(2) | |
157 |
self.assertTrue(ar.serial_time < |
|
157 | self.assertTrue(ar.serial_time < 1.) | |
158 | self.assertTrue(ar.serial_time > 0.2) |
|
158 | self.assertTrue(ar.serial_time > 0.2) | |
159 |
|
159 | |||
160 | def test_serial_time_multi(self): |
|
160 | def test_serial_time_multi(self): | |
@@ -171,8 +171,8 b' class AsyncResultTest(ClusterTestCase):' | |||||
171 | ar = v.apply_async(time.sleep, 0.25) |
|
171 | ar = v.apply_async(time.sleep, 0.25) | |
172 | while not ar.ready(): |
|
172 | while not ar.ready(): | |
173 | time.sleep(0.01) |
|
173 | time.sleep(0.01) | |
174 |
self.assertTrue(ar.elapsed < |
|
174 | self.assertTrue(ar.elapsed < 1) | |
175 |
self.assertTrue(ar.elapsed < |
|
175 | self.assertTrue(ar.elapsed < 1) | |
176 | ar.get(2) |
|
176 | ar.get(2) | |
177 |
|
177 | |||
178 | def test_elapsed_multi(self): |
|
178 | def test_elapsed_multi(self): | |
@@ -180,8 +180,8 b' class AsyncResultTest(ClusterTestCase):' | |||||
180 | ar = v.apply_async(time.sleep, 0.25) |
|
180 | ar = v.apply_async(time.sleep, 0.25) | |
181 | while not ar.ready(): |
|
181 | while not ar.ready(): | |
182 | time.sleep(0.01) |
|
182 | time.sleep(0.01) | |
183 |
self.assertTrue(ar.elapsed < |
|
183 | self.assertTrue(ar.elapsed < 1) | |
184 |
self.assertTrue(ar.elapsed < |
|
184 | self.assertTrue(ar.elapsed < 1) | |
185 | ar.get(2) |
|
185 | ar.get(2) | |
186 |
|
186 | |||
187 | def test_hubresult_timestamps(self): |
|
187 | def test_hubresult_timestamps(self): |
@@ -280,6 +280,25 b' class TestClient(ClusterTestCase):' | |||||
280 | time.sleep(0.25) |
|
280 | time.sleep(0.25) | |
281 | self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) |
|
281 | self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) | |
282 |
|
282 | |||
|
283 | def _wait_for_idle(self): | |||
|
284 | """wait for an engine to become idle, according to the Hub""" | |||
|
285 | rc = self.client | |||
|
286 | ||||
|
287 | # timeout 2s, polling every 100ms | |||
|
288 | for i in range(20): | |||
|
289 | qs = rc.queue_status() | |||
|
290 | if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids): | |||
|
291 | time.sleep(0.1) | |||
|
292 | else: | |||
|
293 | break | |||
|
294 | ||||
|
295 | # ensure Hub up to date: | |||
|
296 | qs = rc.queue_status() | |||
|
297 | self.assertEquals(qs['unassigned'], 0) | |||
|
298 | for eid in rc.ids: | |||
|
299 | self.assertEquals(qs[eid]['tasks'], 0) | |||
|
300 | ||||
|
301 | ||||
283 | def test_resubmit(self): |
|
302 | def test_resubmit(self): | |
284 | def f(): |
|
303 | def f(): | |
285 | import random |
|
304 | import random | |
@@ -288,19 +307,38 b' class TestClient(ClusterTestCase):' | |||||
288 | ar = v.apply_async(f) |
|
307 | ar = v.apply_async(f) | |
289 | r1 = ar.get(1) |
|
308 | r1 = ar.get(1) | |
290 | # give the Hub a chance to notice: |
|
309 | # give the Hub a chance to notice: | |
291 | time.sleep(0.5) |
|
310 | self._wait_for_idle() | |
292 | ahr = self.client.resubmit(ar.msg_ids) |
|
311 | ahr = self.client.resubmit(ar.msg_ids) | |
293 | r2 = ahr.get(1) |
|
312 | r2 = ahr.get(1) | |
294 | self.assertFalse(r1 == r2) |
|
313 | self.assertFalse(r1 == r2) | |
295 |
|
314 | |||
|
315 | def test_resubmit_aborted(self): | |||
|
316 | def f(): | |||
|
317 | import random | |||
|
318 | return random.random() | |||
|
319 | v = self.client.load_balanced_view() | |||
|
320 | # restrict to one engine, so we can put a sleep | |||
|
321 | # ahead of the task, so it will get aborted | |||
|
322 | eid = self.client.ids[-1] | |||
|
323 | v.targets = [eid] | |||
|
324 | sleep = v.apply_async(time.sleep, 0.5) | |||
|
325 | ar = v.apply_async(f) | |||
|
326 | ar.abort() | |||
|
327 | self.assertRaises(error.TaskAborted, ar.get) | |||
|
328 | # Give the Hub a chance to get up to date: | |||
|
329 | self._wait_for_idle() | |||
|
330 | ahr = self.client.resubmit(ar.msg_ids) | |||
|
331 | r2 = ahr.get(1) | |||
|
332 | ||||
296 | def test_resubmit_inflight(self): |
|
333 | def test_resubmit_inflight(self): | |
297 |
""" |
|
334 | """resubmit of inflight task""" | |
298 | v = self.client.load_balanced_view() |
|
335 | v = self.client.load_balanced_view() | |
299 | ar = v.apply_async(time.sleep,1) |
|
336 | ar = v.apply_async(time.sleep,1) | |
300 | # give the message a chance to arrive |
|
337 | # give the message a chance to arrive | |
301 | time.sleep(0.2) |
|
338 | time.sleep(0.2) | |
302 |
|
|
339 | ahr = self.client.resubmit(ar.msg_ids) | |
303 | ar.get(2) |
|
340 | ar.get(2) | |
|
341 | ahr.get(2) | |||
304 |
|
342 | |||
305 | def test_resubmit_badkey(self): |
|
343 | def test_resubmit_badkey(self): | |
306 | """ensure KeyError on resubmit of nonexistant task""" |
|
344 | """ensure KeyError on resubmit of nonexistant task""" |
@@ -18,6 +18,7 b' Authors:' | |||||
18 |
|
18 | |||
19 | from __future__ import division |
|
19 | from __future__ import division | |
20 |
|
20 | |||
|
21 | import logging | |||
21 | import os |
|
22 | import os | |
22 | import tempfile |
|
23 | import tempfile | |
23 | import time |
|
24 | import time | |
@@ -226,7 +227,9 b' class TestSQLiteBackend(TestDictBackend):' | |||||
226 | @dec.skip_without('sqlite3') |
|
227 | @dec.skip_without('sqlite3') | |
227 | def create_db(self): |
|
228 | def create_db(self): | |
228 | location, fname = os.path.split(temp_db) |
|
229 | location, fname = os.path.split(temp_db) | |
229 | return SQLiteDB(location=location, fname=fname) |
|
230 | log = logging.getLogger('test') | |
|
231 | log.setLevel(logging.CRITICAL) | |||
|
232 | return SQLiteDB(location=location, fname=fname, log=log) | |||
230 |
|
233 | |||
231 | def tearDown(self): |
|
234 | def tearDown(self): | |
232 | self.db._db.close() |
|
235 | self.db._db.close() |
@@ -25,6 +25,7 b' import zmq' | |||||
25 | from nose import SkipTest |
|
25 | from nose import SkipTest | |
26 |
|
26 | |||
27 | from IPython.testing import decorators as dec |
|
27 | from IPython.testing import decorators as dec | |
|
28 | from IPython.testing.ipunittest import ParametricTestCase | |||
28 |
|
29 | |||
29 | from IPython import parallel as pmod |
|
30 | from IPython import parallel as pmod | |
30 | from IPython.parallel import error |
|
31 | from IPython.parallel import error | |
@@ -39,7 +40,7 b' from .clienttest import ClusterTestCase, crash, wait, skip_without' | |||||
39 | def setup(): |
|
40 | def setup(): | |
40 | add_engines(3, total=True) |
|
41 | add_engines(3, total=True) | |
41 |
|
42 | |||
42 | class TestView(ClusterTestCase): |
|
43 | class TestView(ClusterTestCase, ParametricTestCase): | |
43 |
|
44 | |||
44 | def test_z_crash_mux(self): |
|
45 | def test_z_crash_mux(self): | |
45 | """test graceful handling of engine death (direct)""" |
|
46 | """test graceful handling of engine death (direct)""" | |
@@ -421,16 +422,16 b' class TestView(ClusterTestCase):' | |||||
421 | sys.stdout = sio |
|
422 | sys.stdout = sio | |
422 | ip.magic_autopx() |
|
423 | ip.magic_autopx() | |
423 | ip.run_cell('\n'.join(('a=5','b=10','c=0'))) |
|
424 | ip.run_cell('\n'.join(('a=5','b=10','c=0'))) | |
424 |
ip.run_cell(' |
|
425 | ip.run_cell('b*=2') | |
|
426 | ip.run_cell('print (b)') | |||
425 | ip.run_cell("b/c") |
|
427 | ip.run_cell("b/c") | |
426 | ip.run_code(compile('b*=2', '', 'single')) |
|
|||
427 | ip.magic_autopx() |
|
428 | ip.magic_autopx() | |
428 | sys.stdout = savestdout |
|
429 | sys.stdout = savestdout | |
429 | output = sio.getvalue().strip() |
|
430 | output = sio.getvalue().strip() | |
430 | self.assertTrue(output.startswith('%autopx enabled')) |
|
431 | self.assertTrue(output.startswith('%autopx enabled')) | |
431 | self.assertTrue(output.endswith('%autopx disabled')) |
|
432 | self.assertTrue(output.endswith('%autopx disabled')) | |
432 | self.assertTrue('RemoteError: ZeroDivisionError' in output) |
|
433 | self.assertTrue('RemoteError: ZeroDivisionError' in output) | |
433 |
ar = v.get_result(- |
|
434 | ar = v.get_result(-1) | |
434 | self.assertEquals(v['a'], 5) |
|
435 | self.assertEquals(v['a'], 5) | |
435 | self.assertEquals(v['b'], 20) |
|
436 | self.assertEquals(v['b'], 20) | |
436 | self.assertRaisesRemote(ZeroDivisionError, ar.get) |
|
437 | self.assertRaisesRemote(ZeroDivisionError, ar.get) | |
@@ -446,9 +447,10 b' class TestView(ClusterTestCase):' | |||||
446 | sys.stdout = sio |
|
447 | sys.stdout = sio | |
447 | ip.magic_autopx() |
|
448 | ip.magic_autopx() | |
448 | ip.run_cell('\n'.join(('a=5','b=10','c=0'))) |
|
449 | ip.run_cell('\n'.join(('a=5','b=10','c=0'))) | |
449 | ip.run_cell('print b') |
|
450 | ip.run_cell('print (b)') | |
|
451 | ip.run_cell('import time; time.sleep(0.1)') | |||
450 | ip.run_cell("b/c") |
|
452 | ip.run_cell("b/c") | |
451 |
ip.run_c |
|
453 | ip.run_cell('b*=2') | |
452 | ip.magic_autopx() |
|
454 | ip.magic_autopx() | |
453 | sys.stdout = savestdout |
|
455 | sys.stdout = savestdout | |
454 | output = sio.getvalue().strip() |
|
456 | output = sio.getvalue().strip() | |
@@ -456,9 +458,12 b' class TestView(ClusterTestCase):' | |||||
456 | self.assertTrue(output.endswith('%autopx disabled')) |
|
458 | self.assertTrue(output.endswith('%autopx disabled')) | |
457 | self.assertFalse('ZeroDivisionError' in output) |
|
459 | self.assertFalse('ZeroDivisionError' in output) | |
458 | ar = v.get_result(-2) |
|
460 | ar = v.get_result(-2) | |
459 | self.assertEquals(v['a'], 5) |
|
|||
460 | self.assertEquals(v['b'], 20) |
|
|||
461 | self.assertRaisesRemote(ZeroDivisionError, ar.get) |
|
461 | self.assertRaisesRemote(ZeroDivisionError, ar.get) | |
|
462 | # prevent TaskAborted on pulls, due to ZeroDivisionError | |||
|
463 | time.sleep(0.5) | |||
|
464 | self.assertEquals(v['a'], 5) | |||
|
465 | # b*=2 will not fire, due to abort | |||
|
466 | self.assertEquals(v['b'], 10) | |||
462 |
|
467 | |||
463 | def test_magic_result(self): |
|
468 | def test_magic_result(self): | |
464 | ip = get_ipython() |
|
469 | ip = get_ipython() | |
@@ -551,3 +556,139 b' class TestView(ClusterTestCase):' | |||||
551 | result = e0.map_sync(lambda x: -1*x, r) |
|
556 | result = e0.map_sync(lambda x: -1*x, r) | |
552 | self.assertEquals(result, check) |
|
557 | self.assertEquals(result, check) | |
553 |
|
558 | |||
|
559 | def test_len(self): | |||
|
560 | """len(view) makes sense""" | |||
|
561 | e0 = self.client[self.client.ids[0]] | |||
|
562 | yield self.assertEquals(len(e0), 1) | |||
|
563 | v = self.client[:] | |||
|
564 | yield self.assertEquals(len(v), len(self.client.ids)) | |||
|
565 | v = self.client.direct_view('all') | |||
|
566 | yield self.assertEquals(len(v), len(self.client.ids)) | |||
|
567 | v = self.client[:2] | |||
|
568 | yield self.assertEquals(len(v), 2) | |||
|
569 | v = self.client[:1] | |||
|
570 | yield self.assertEquals(len(v), 1) | |||
|
571 | v = self.client.load_balanced_view() | |||
|
572 | yield self.assertEquals(len(v), len(self.client.ids)) | |||
|
573 | # parametric tests seem to require manual closing? | |||
|
574 | self.client.close() | |||
|
575 | ||||
|
576 | ||||
|
577 | # begin execute tests | |||
|
578 | def _wait_for(self, f, timeout=10): | |||
|
579 | tic = time.time() | |||
|
580 | while time.time() <= tic + timeout: | |||
|
581 | if f(): | |||
|
582 | return | |||
|
583 | time.sleep(0.1) | |||
|
584 | self.client.spin() | |||
|
585 | if not f(): | |||
|
586 | print "Warning: Awaited condition never arrived" | |||
|
587 | ||||
|
588 | ||||
|
589 | def test_execute_reply(self): | |||
|
590 | e0 = self.client[self.client.ids[0]] | |||
|
591 | e0.block = True | |||
|
592 | ar = e0.execute("5", silent=False) | |||
|
593 | er = ar.get() | |||
|
594 | self._wait_for(lambda : bool(er.pyout)) | |||
|
595 | self.assertEquals(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count) | |||
|
596 | self.assertEquals(er.pyout['data']['text/plain'], '5') | |||
|
597 | ||||
|
598 | def test_execute_reply_stdout(self): | |||
|
599 | e0 = self.client[self.client.ids[0]] | |||
|
600 | e0.block = True | |||
|
601 | ar = e0.execute("print (5)", silent=False) | |||
|
602 | er = ar.get() | |||
|
603 | self._wait_for(lambda : bool(er.stdout)) | |||
|
604 | self.assertEquals(er.stdout.strip(), '5') | |||
|
605 | ||||
|
606 | def test_execute_pyout(self): | |||
|
607 | """execute triggers pyout with silent=False""" | |||
|
608 | view = self.client[:] | |||
|
609 | ar = view.execute("5", silent=False, block=True) | |||
|
610 | self._wait_for(lambda : all(ar.pyout)) | |||
|
611 | ||||
|
612 | expected = [{'text/plain' : '5'}] * len(view) | |||
|
613 | mimes = [ out['data'] for out in ar.pyout ] | |||
|
614 | self.assertEquals(mimes, expected) | |||
|
615 | ||||
|
616 | def test_execute_silent(self): | |||
|
617 | """execute does not trigger pyout with silent=True""" | |||
|
618 | view = self.client[:] | |||
|
619 | ar = view.execute("5", block=True) | |||
|
620 | expected = [None] * len(view) | |||
|
621 | self.assertEquals(ar.pyout, expected) | |||
|
622 | ||||
|
623 | def test_execute_magic(self): | |||
|
624 | """execute accepts IPython commands""" | |||
|
625 | view = self.client[:] | |||
|
626 | view.execute("a = 5") | |||
|
627 | ar = view.execute("%whos", block=True) | |||
|
628 | # this will raise, if that failed | |||
|
629 | ar.get(5) | |||
|
630 | self._wait_for(lambda : all(ar.stdout)) | |||
|
631 | for stdout in ar.stdout: | |||
|
632 | lines = stdout.splitlines() | |||
|
633 | self.assertEquals(lines[0].split(), ['Variable', 'Type', 'Data/Info']) | |||
|
634 | found = False | |||
|
635 | for line in lines[2:]: | |||
|
636 | split = line.split() | |||
|
637 | if split == ['a', 'int', '5']: | |||
|
638 | found = True | |||
|
639 | break | |||
|
640 | self.assertTrue(found, "whos output wrong: %s" % stdout) | |||
|
641 | ||||
|
642 | def test_execute_displaypub(self): | |||
|
643 | """execute tracks display_pub output""" | |||
|
644 | view = self.client[:] | |||
|
645 | view.execute("from IPython.core.display import *") | |||
|
646 | ar = view.execute("[ display(i) for i in range(5) ]", block=True) | |||
|
647 | ||||
|
648 | self._wait_for(lambda : all(len(er.outputs) >= 5 for er in ar)) | |||
|
649 | expected = [ {u'text/plain' : unicode(j)} for j in range(5) ] | |||
|
650 | for outputs in ar.outputs: | |||
|
651 | mimes = [ out['data'] for out in outputs ] | |||
|
652 | self.assertEquals(mimes, expected) | |||
|
653 | ||||
|
654 | def test_apply_displaypub(self): | |||
|
655 | """apply tracks display_pub output""" | |||
|
656 | view = self.client[:] | |||
|
657 | view.execute("from IPython.core.display import *") | |||
|
658 | ||||
|
659 | @interactive | |||
|
660 | def publish(): | |||
|
661 | [ display(i) for i in range(5) ] | |||
|
662 | ||||
|
663 | ar = view.apply_async(publish) | |||
|
664 | ar.get(5) | |||
|
665 | self._wait_for(lambda : all(len(out) >= 5 for out in ar.outputs)) | |||
|
666 | expected = [ {u'text/plain' : unicode(j)} for j in range(5) ] | |||
|
667 | for outputs in ar.outputs: | |||
|
668 | mimes = [ out['data'] for out in outputs ] | |||
|
669 | self.assertEquals(mimes, expected) | |||
|
670 | ||||
|
671 | def test_execute_raises(self): | |||
|
672 | """exceptions in execute requests raise appropriately""" | |||
|
673 | view = self.client[-1] | |||
|
674 | ar = view.execute("1/0") | |||
|
675 | self.assertRaisesRemote(ZeroDivisionError, ar.get, 2) | |||
|
676 | ||||
|
677 | @dec.skipif_not_matplotlib | |||
|
678 | def test_magic_pylab(self): | |||
|
679 | """%pylab works on engines""" | |||
|
680 | view = self.client[-1] | |||
|
681 | ar = view.execute("%pylab inline") | |||
|
682 | # at least check if this raised: | |||
|
683 | reply = ar.get(5) | |||
|
684 | # include imports, in case user config | |||
|
685 | ar = view.execute("plot(rand(100))", silent=False) | |||
|
686 | reply = ar.get(5) | |||
|
687 | self._wait_for(lambda : all(ar.outputs)) | |||
|
688 | self.assertEquals(len(reply.outputs), 1) | |||
|
689 | output = reply.outputs[0] | |||
|
690 | self.assertTrue("data" in output) | |||
|
691 | data = output['data'] | |||
|
692 | self.assertTrue("image/png" in data) | |||
|
693 | ||||
|
694 |
@@ -47,6 +47,9 b' from IPython.utils import py3compat' | |||||
47 | from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence |
|
47 | from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence | |
48 | from IPython.utils.newserialized import serialize, unserialize |
|
48 | from IPython.utils.newserialized import serialize, unserialize | |
49 | from IPython.zmq.log import EnginePUBHandler |
|
49 | from IPython.zmq.log import EnginePUBHandler | |
|
50 | from IPython.zmq.serialize import ( | |||
|
51 | unserialize_object, serialize_object, pack_apply_message, unpack_apply_message | |||
|
52 | ) | |||
50 |
|
53 | |||
51 | if py3compat.PY3: |
|
54 | if py3compat.PY3: | |
52 | buffer = memoryview |
|
55 | buffer = memoryview | |
@@ -121,12 +124,6 b' def log_errors(f, self, *args, **kwargs):' | |||||
121 | self.log.error("Uncaught exception in %r" % f, exc_info=True) |
|
124 | self.log.error("Uncaught exception in %r" % f, exc_info=True) | |
122 |
|
125 | |||
123 |
|
126 | |||
124 | def asbytes(s): |
|
|||
125 | """ensure that an object is ascii bytes""" |
|
|||
126 | if isinstance(s, unicode): |
|
|||
127 | s = s.encode('ascii') |
|
|||
128 | return s |
|
|||
129 |
|
||||
130 | def is_url(url): |
|
127 | def is_url(url): | |
131 | """boolean check for whether a string is a zmq url""" |
|
128 | """boolean check for whether a string is a zmq url""" | |
132 | if '://' not in url: |
|
129 | if '://' not in url: | |
@@ -222,140 +219,6 b' def disambiguate_url(url, location=None):' | |||||
222 |
|
219 | |||
223 | return "%s://%s:%s"%(proto,ip,port) |
|
220 | return "%s://%s:%s"%(proto,ip,port) | |
224 |
|
221 | |||
225 | def serialize_object(obj, threshold=64e-6): |
|
|||
226 | """Serialize an object into a list of sendable buffers. |
|
|||
227 |
|
||||
228 | Parameters |
|
|||
229 | ---------- |
|
|||
230 |
|
||||
231 | obj : object |
|
|||
232 | The object to be serialized |
|
|||
233 | threshold : float |
|
|||
234 | The threshold for not double-pickling the content. |
|
|||
235 |
|
||||
236 |
|
||||
237 | Returns |
|
|||
238 | ------- |
|
|||
239 | ('pmd', [bufs]) : |
|
|||
240 | where pmd is the pickled metadata wrapper, |
|
|||
241 | bufs is a list of data buffers |
|
|||
242 | """ |
|
|||
243 | databuffers = [] |
|
|||
244 | if isinstance(obj, (list, tuple)): |
|
|||
245 | clist = canSequence(obj) |
|
|||
246 | slist = map(serialize, clist) |
|
|||
247 | for s in slist: |
|
|||
248 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: |
|
|||
249 | databuffers.append(s.getData()) |
|
|||
250 | s.data = None |
|
|||
251 | return pickle.dumps(slist,-1), databuffers |
|
|||
252 | elif isinstance(obj, dict): |
|
|||
253 | sobj = {} |
|
|||
254 | for k in sorted(obj.iterkeys()): |
|
|||
255 | s = serialize(can(obj[k])) |
|
|||
256 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: |
|
|||
257 | databuffers.append(s.getData()) |
|
|||
258 | s.data = None |
|
|||
259 | sobj[k] = s |
|
|||
260 | return pickle.dumps(sobj,-1),databuffers |
|
|||
261 | else: |
|
|||
262 | s = serialize(can(obj)) |
|
|||
263 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: |
|
|||
264 | databuffers.append(s.getData()) |
|
|||
265 | s.data = None |
|
|||
266 | return pickle.dumps(s,-1),databuffers |
|
|||
267 |
|
||||
268 |
|
||||
269 | def unserialize_object(bufs): |
|
|||
270 | """reconstruct an object serialized by serialize_object from data buffers.""" |
|
|||
271 | bufs = list(bufs) |
|
|||
272 | sobj = pickle.loads(bufs.pop(0)) |
|
|||
273 | if isinstance(sobj, (list, tuple)): |
|
|||
274 | for s in sobj: |
|
|||
275 | if s.data is None: |
|
|||
276 | s.data = bufs.pop(0) |
|
|||
277 | return uncanSequence(map(unserialize, sobj)), bufs |
|
|||
278 | elif isinstance(sobj, dict): |
|
|||
279 | newobj = {} |
|
|||
280 | for k in sorted(sobj.iterkeys()): |
|
|||
281 | s = sobj[k] |
|
|||
282 | if s.data is None: |
|
|||
283 | s.data = bufs.pop(0) |
|
|||
284 | newobj[k] = uncan(unserialize(s)) |
|
|||
285 | return newobj, bufs |
|
|||
286 | else: |
|
|||
287 | if sobj.data is None: |
|
|||
288 | sobj.data = bufs.pop(0) |
|
|||
289 | return uncan(unserialize(sobj)), bufs |
|
|||
290 |
|
||||
291 | def pack_apply_message(f, args, kwargs, threshold=64e-6): |
|
|||
292 | """pack up a function, args, and kwargs to be sent over the wire |
|
|||
293 | as a series of buffers. Any object whose data is larger than `threshold` |
|
|||
294 | will not have their data copied (currently only numpy arrays support zero-copy)""" |
|
|||
295 | msg = [pickle.dumps(can(f),-1)] |
|
|||
296 | databuffers = [] # for large objects |
|
|||
297 | sargs, bufs = serialize_object(args,threshold) |
|
|||
298 | msg.append(sargs) |
|
|||
299 | databuffers.extend(bufs) |
|
|||
300 | skwargs, bufs = serialize_object(kwargs,threshold) |
|
|||
301 | msg.append(skwargs) |
|
|||
302 | databuffers.extend(bufs) |
|
|||
303 | msg.extend(databuffers) |
|
|||
304 | return msg |
|
|||
305 |
|
||||
306 | def unpack_apply_message(bufs, g=None, copy=True): |
|
|||
307 | """unpack f,args,kwargs from buffers packed by pack_apply_message() |
|
|||
308 | Returns: original f,args,kwargs""" |
|
|||
309 | bufs = list(bufs) # allow us to pop |
|
|||
310 | assert len(bufs) >= 3, "not enough buffers!" |
|
|||
311 | if not copy: |
|
|||
312 | for i in range(3): |
|
|||
313 | bufs[i] = bufs[i].bytes |
|
|||
314 | cf = pickle.loads(bufs.pop(0)) |
|
|||
315 | sargs = list(pickle.loads(bufs.pop(0))) |
|
|||
316 | skwargs = dict(pickle.loads(bufs.pop(0))) |
|
|||
317 | # print sargs, skwargs |
|
|||
318 | f = uncan(cf, g) |
|
|||
319 | for sa in sargs: |
|
|||
320 | if sa.data is None: |
|
|||
321 | m = bufs.pop(0) |
|
|||
322 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): |
|
|||
323 | # always use a buffer, until memoryviews get sorted out |
|
|||
324 | sa.data = buffer(m) |
|
|||
325 | # disable memoryview support |
|
|||
326 | # if copy: |
|
|||
327 | # sa.data = buffer(m) |
|
|||
328 | # else: |
|
|||
329 | # sa.data = m.buffer |
|
|||
330 | else: |
|
|||
331 | if copy: |
|
|||
332 | sa.data = m |
|
|||
333 | else: |
|
|||
334 | sa.data = m.bytes |
|
|||
335 |
|
||||
336 | args = uncanSequence(map(unserialize, sargs), g) |
|
|||
337 | kwargs = {} |
|
|||
338 | for k in sorted(skwargs.iterkeys()): |
|
|||
339 | sa = skwargs[k] |
|
|||
340 | if sa.data is None: |
|
|||
341 | m = bufs.pop(0) |
|
|||
342 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): |
|
|||
343 | # always use a buffer, until memoryviews get sorted out |
|
|||
344 | sa.data = buffer(m) |
|
|||
345 | # disable memoryview support |
|
|||
346 | # if copy: |
|
|||
347 | # sa.data = buffer(m) |
|
|||
348 | # else: |
|
|||
349 | # sa.data = m.buffer |
|
|||
350 | else: |
|
|||
351 | if copy: |
|
|||
352 | sa.data = m |
|
|||
353 | else: |
|
|||
354 | sa.data = m.bytes |
|
|||
355 |
|
||||
356 | kwargs[k] = uncan(unserialize(sa), g) |
|
|||
357 |
|
||||
358 | return f,args,kwargs |
|
|||
359 |
|
222 | |||
360 | #-------------------------------------------------------------------------- |
|
223 | #-------------------------------------------------------------------------- | |
361 | # helpers for implementing old MEC API via view.apply |
|
224 | # helpers for implementing old MEC API via view.apply |
@@ -43,6 +43,7 b' class ZMQShellDisplayHook(DisplayHook):' | |||||
43 | """A displayhook subclass that publishes data using ZeroMQ. This is intended |
|
43 | """A displayhook subclass that publishes data using ZeroMQ. This is intended | |
44 | to work with an InteractiveShell instance. It sends a dict of different |
|
44 | to work with an InteractiveShell instance. It sends a dict of different | |
45 | representations of the object.""" |
|
45 | representations of the object.""" | |
|
46 | topic=None | |||
46 |
|
47 | |||
47 | session = Instance(Session) |
|
48 | session = Instance(Session) | |
48 | pub_socket = Instance('zmq.Socket') |
|
49 | pub_socket = Instance('zmq.Socket') | |
@@ -67,6 +68,6 b' class ZMQShellDisplayHook(DisplayHook):' | |||||
67 | """Finish up all displayhook activities.""" |
|
68 | """Finish up all displayhook activities.""" | |
68 | sys.stdout.flush() |
|
69 | sys.stdout.flush() | |
69 | sys.stderr.flush() |
|
70 | sys.stderr.flush() | |
70 | self.session.send(self.pub_socket, self.msg) |
|
71 | self.session.send(self.pub_socket, self.msg, ident=self.topic) | |
71 | self.msg = None |
|
72 | self.msg = None | |
72 |
|
73 |
@@ -20,8 +20,10 b' import sys' | |||||
20 | import zmq |
|
20 | import zmq | |
21 |
|
21 | |||
22 | # Local imports. |
|
22 | # Local imports. | |
|
23 | from IPython.config.application import Application | |||
23 | from IPython.utils import io |
|
24 | from IPython.utils import io | |
24 |
|
25 | |||
|
26 | ||||
25 | #------------------------------------------------------------------------------ |
|
27 | #------------------------------------------------------------------------------ | |
26 | # Eventloops for integrating the Kernel into different GUIs |
|
28 | # Eventloops for integrating the Kernel into different GUIs | |
27 | #------------------------------------------------------------------------------ |
|
29 | #------------------------------------------------------------------------------ | |
@@ -164,7 +166,10 b' def loop_cocoa(kernel):' | |||||
164 | # but still need a Poller for when there are no active windows, |
|
166 | # but still need a Poller for when there are no active windows, | |
165 | # during which time mainloop() returns immediately |
|
167 | # during which time mainloop() returns immediately | |
166 | poller = zmq.Poller() |
|
168 | poller = zmq.Poller() | |
167 | poller.register(kernel.shell_socket, zmq.POLLIN) |
|
169 | if kernel.control_stream: | |
|
170 | poller.register(kernel.control_stream.socket, zmq.POLLIN) | |||
|
171 | for stream in kernel.shell_streams: | |||
|
172 | poller.register(stream.socket, zmq.POLLIN) | |||
168 |
|
173 | |||
169 | while True: |
|
174 | while True: | |
170 | try: |
|
175 | try: | |
@@ -203,11 +208,15 b' loop_map = {' | |||||
203 |
|
208 | |||
204 | def enable_gui(gui, kernel=None): |
|
209 | def enable_gui(gui, kernel=None): | |
205 | """Enable integration with a given GUI""" |
|
210 | """Enable integration with a given GUI""" | |
206 | if kernel is None: |
|
|||
207 | from .ipkernel import IPKernelApp |
|
|||
208 | kernel = IPKernelApp.instance().kernel |
|
|||
209 | if gui not in loop_map: |
|
211 | if gui not in loop_map: | |
210 | raise ValueError("GUI %r not supported" % gui) |
|
212 | raise ValueError("GUI %r not supported" % gui) | |
|
213 | if kernel is None: | |||
|
214 | if Application.initialized(): | |||
|
215 | kernel = getattr(Application.instance(), 'kernel', None) | |||
|
216 | if kernel is None: | |||
|
217 | raise RuntimeError("You didn't specify a kernel," | |||
|
218 | " and no IPython Application with a kernel appears to be running." | |||
|
219 | ) | |||
211 | loop = loop_map[gui] |
|
220 | loop = loop_map[gui] | |
212 | if kernel.eventloop is not None and kernel.eventloop is not loop: |
|
221 | if kernel.eventloop is not None and kernel.eventloop is not loop: | |
213 | raise RuntimeError("Cannot activate multiple GUI eventloops") |
|
222 | raise RuntimeError("Cannot activate multiple GUI eventloops") |
@@ -15,20 +15,26 b' Things to do:' | |||||
15 | #----------------------------------------------------------------------------- |
|
15 | #----------------------------------------------------------------------------- | |
16 | from __future__ import print_function |
|
16 | from __future__ import print_function | |
17 |
|
17 | |||
18 |
# Standard library imports |
|
18 | # Standard library imports | |
19 | import __builtin__ |
|
19 | import __builtin__ | |
20 | import atexit |
|
20 | import atexit | |
21 | import sys |
|
21 | import sys | |
22 | import time |
|
22 | import time | |
23 | import traceback |
|
23 | import traceback | |
24 | import logging |
|
24 | import logging | |
|
25 | import uuid | |||
|
26 | ||||
|
27 | from datetime import datetime | |||
25 | from signal import ( |
|
28 | from signal import ( | |
26 | signal, default_int_handler, SIGINT, SIG_IGN |
|
29 | signal, getsignal, default_int_handler, SIGINT, SIG_IGN | |
27 | ) |
|
30 | ) | |
28 | # System library imports. |
|
31 | ||
|
32 | # System library imports | |||
29 | import zmq |
|
33 | import zmq | |
|
34 | from zmq.eventloop import ioloop | |||
|
35 | from zmq.eventloop.zmqstream import ZMQStream | |||
30 |
|
36 | |||
31 |
# Local imports |
|
37 | # Local imports | |
32 | from IPython.core import pylabtools |
|
38 | from IPython.core import pylabtools | |
33 | from IPython.config.configurable import Configurable |
|
39 | from IPython.config.configurable import Configurable | |
34 | from IPython.config.application import boolean_flag, catch_config_error |
|
40 | from IPython.config.application import boolean_flag, catch_config_error | |
@@ -42,11 +48,12 b' from IPython.utils import py3compat' | |||||
42 | from IPython.utils.frame import extract_module_locals |
|
48 | from IPython.utils.frame import extract_module_locals | |
43 | from IPython.utils.jsonutil import json_clean |
|
49 | from IPython.utils.jsonutil import json_clean | |
44 | from IPython.utils.traitlets import ( |
|
50 | from IPython.utils.traitlets import ( | |
45 | Any, Instance, Float, Dict, CaselessStrEnum |
|
51 | Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode | |
46 | ) |
|
52 | ) | |
47 |
|
53 | |||
48 | from entry_point import base_launch_kernel |
|
54 | from entry_point import base_launch_kernel | |
49 | from kernelapp import KernelApp, kernel_flags, kernel_aliases |
|
55 | from kernelapp import KernelApp, kernel_flags, kernel_aliases | |
|
56 | from serialize import serialize_object, unpack_apply_message | |||
50 | from session import Session, Message |
|
57 | from session import Session, Message | |
51 | from zmqshell import ZMQInteractiveShell |
|
58 | from zmqshell import ZMQInteractiveShell | |
52 |
|
59 | |||
@@ -63,16 +70,21 b' class Kernel(Configurable):' | |||||
63 |
|
70 | |||
64 | # attribute to override with a GUI |
|
71 | # attribute to override with a GUI | |
65 | eventloop = Any(None) |
|
72 | eventloop = Any(None) | |
|
73 | def _eventloop_changed(self, name, old, new): | |||
|
74 | """schedule call to eventloop from IOLoop""" | |||
|
75 | loop = ioloop.IOLoop.instance() | |||
|
76 | loop.add_timeout(time.time()+0.1, self.enter_eventloop) | |||
66 |
|
77 | |||
67 | shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') |
|
78 | shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') | |
68 | session = Instance(Session) |
|
79 | session = Instance(Session) | |
69 | profile_dir = Instance('IPython.core.profiledir.ProfileDir') |
|
80 | profile_dir = Instance('IPython.core.profiledir.ProfileDir') | |
70 | shell_socket = Instance('zmq.Socket') |
|
81 | shell_streams = List() | |
71 | iopub_socket = Instance('zmq.Socket') |
|
82 | control_stream = Instance(ZMQStream) | |
72 |
|
|
83 | iopub_socket = Instance(zmq.Socket) | |
|
84 | stdin_socket = Instance(zmq.Socket) | |||
73 | log = Instance(logging.Logger) |
|
85 | log = Instance(logging.Logger) | |
74 |
|
86 | |||
75 |
user_module = |
|
87 | user_module = Any() | |
76 | def _user_module_changed(self, name, old, new): |
|
88 | def _user_module_changed(self, name, old, new): | |
77 | if self.shell is not None: |
|
89 | if self.shell is not None: | |
78 | self.shell.user_module = new |
|
90 | self.shell.user_module = new | |
@@ -83,6 +95,14 b' class Kernel(Configurable):' | |||||
83 | self.shell.user_ns = new |
|
95 | self.shell.user_ns = new | |
84 | self.shell.init_user_ns() |
|
96 | self.shell.init_user_ns() | |
85 |
|
97 | |||
|
98 | # identities: | |||
|
99 | int_id = Integer(-1) | |||
|
100 | ident = Unicode() | |||
|
101 | ||||
|
102 | def _ident_default(self): | |||
|
103 | return unicode(uuid.uuid4()) | |||
|
104 | ||||
|
105 | ||||
86 | # Private interface |
|
106 | # Private interface | |
87 |
|
107 | |||
88 | # Time to sleep after flushing the stdout/err buffers in each execute |
|
108 | # Time to sleep after flushing the stdout/err buffers in each execute | |
@@ -110,15 +130,13 b' class Kernel(Configurable):' | |||||
110 | # by record_ports and used by connect_request. |
|
130 | # by record_ports and used by connect_request. | |
111 | _recorded_ports = Dict() |
|
131 | _recorded_ports = Dict() | |
112 |
|
132 | |||
|
133 | # set of aborted msg_ids | |||
|
134 | aborted = Set() | |||
113 |
|
135 | |||
114 |
|
136 | |||
115 | def __init__(self, **kwargs): |
|
137 | def __init__(self, **kwargs): | |
116 | super(Kernel, self).__init__(**kwargs) |
|
138 | super(Kernel, self).__init__(**kwargs) | |
117 |
|
139 | |||
118 | # Before we even start up the shell, register *first* our exit handlers |
|
|||
119 | # so they come before the shell's |
|
|||
120 | atexit.register(self._at_shutdown) |
|
|||
121 |
|
||||
122 | # Initialize the InteractiveShell subclass |
|
140 | # Initialize the InteractiveShell subclass | |
123 | self.shell = ZMQInteractiveShell.instance(config=self.config, |
|
141 | self.shell = ZMQInteractiveShell.instance(config=self.config, | |
124 | profile_dir = self.profile_dir, |
|
142 | profile_dir = self.profile_dir, | |
@@ -127,6 +145,7 b' class Kernel(Configurable):' | |||||
127 | ) |
|
145 | ) | |
128 | self.shell.displayhook.session = self.session |
|
146 | self.shell.displayhook.session = self.session | |
129 | self.shell.displayhook.pub_socket = self.iopub_socket |
|
147 | self.shell.displayhook.pub_socket = self.iopub_socket | |
|
148 | self.shell.displayhook.topic = self._topic('pyout') | |||
130 | self.shell.display_pub.session = self.session |
|
149 | self.shell.display_pub.session = self.session | |
131 | self.shell.display_pub.pub_socket = self.iopub_socket |
|
150 | self.shell.display_pub.pub_socket = self.iopub_socket | |
132 |
|
151 | |||
@@ -136,96 +155,131 b' class Kernel(Configurable):' | |||||
136 | # Build dict of handlers for message types |
|
155 | # Build dict of handlers for message types | |
137 | msg_types = [ 'execute_request', 'complete_request', |
|
156 | msg_types = [ 'execute_request', 'complete_request', | |
138 | 'object_info_request', 'history_request', |
|
157 | 'object_info_request', 'history_request', | |
139 |
'connect_request', 'shutdown_request' |
|
158 | 'connect_request', 'shutdown_request', | |
140 | self.handlers = {} |
|
159 | 'apply_request', | |
|
160 | ] | |||
|
161 | self.shell_handlers = {} | |||
141 | for msg_type in msg_types: |
|
162 | for msg_type in msg_types: | |
142 | self.handlers[msg_type] = getattr(self, msg_type) |
|
163 | self.shell_handlers[msg_type] = getattr(self, msg_type) | |
143 |
|
164 | |||
144 | def do_one_iteration(self): |
|
165 | control_msg_types = msg_types + [ 'clear_request', 'abort_request' ] | |
145 | """Do one iteration of the kernel's evaluation loop. |
|
166 | self.control_handlers = {} | |
146 | """ |
|
167 | for msg_type in control_msg_types: | |
|
168 | self.control_handlers[msg_type] = getattr(self, msg_type) | |||
|
169 | ||||
|
170 | def dispatch_control(self, msg): | |||
|
171 | """dispatch control requests""" | |||
|
172 | idents,msg = self.session.feed_identities(msg, copy=False) | |||
147 | try: |
|
173 | try: | |
148 | ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) |
|
174 | msg = self.session.unserialize(msg, content=True, copy=False) | |
149 |
except |
|
175 | except: | |
150 |
self.log. |
|
176 | self.log.error("Invalid Control Message", exc_info=True) | |
151 | return |
|
177 | return | |
152 | if msg is None: |
|
178 | ||
|
179 | self.log.debug("Control received: %s", msg) | |||
|
180 | ||||
|
181 | header = msg['header'] | |||
|
182 | msg_id = header['msg_id'] | |||
|
183 | msg_type = header['msg_type'] | |||
|
184 | ||||
|
185 | handler = self.control_handlers.get(msg_type, None) | |||
|
186 | if handler is None: | |||
|
187 | self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) | |||
|
188 | else: | |||
|
189 | try: | |||
|
190 | handler(self.control_stream, idents, msg) | |||
|
191 | except Exception: | |||
|
192 | self.log.error("Exception in control handler:", exc_info=True) | |||
|
193 | ||||
|
194 | def dispatch_shell(self, stream, msg): | |||
|
195 | """dispatch shell requests""" | |||
|
196 | # flush control requests first | |||
|
197 | if self.control_stream: | |||
|
198 | self.control_stream.flush() | |||
|
199 | ||||
|
200 | idents,msg = self.session.feed_identities(msg, copy=False) | |||
|
201 | try: | |||
|
202 | msg = self.session.unserialize(msg, content=True, copy=False) | |||
|
203 | except: | |||
|
204 | self.log.error("Invalid Message", exc_info=True) | |||
153 | return |
|
205 | return | |
154 |
|
206 | |||
|
207 | header = msg['header'] | |||
|
208 | msg_id = header['msg_id'] | |||
155 | msg_type = msg['header']['msg_type'] |
|
209 | msg_type = msg['header']['msg_type'] | |
156 |
|
210 | |||
157 | # This assert will raise in versions of zeromq 2.0.7 and lesser. |
|
|||
158 | # We now require 2.0.8 or above, so we can uncomment for safety. |
|
|||
159 | # print(ident,msg, file=sys.__stdout__) |
|
|||
160 | assert ident is not None, "Missing message part." |
|
|||
161 |
|
||||
162 | # Print some info about this message and leave a '--->' marker, so it's |
|
211 | # Print some info about this message and leave a '--->' marker, so it's | |
163 | # easier to trace visually the message chain when debugging. Each |
|
212 | # easier to trace visually the message chain when debugging. Each | |
164 | # handler prints its message at the end. |
|
213 | # handler prints its message at the end. | |
165 |
self.log.debug('\n*** MESSAGE TYPE:' |
|
214 | self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) | |
166 |
self.log.debug(' Content: ' |
|
215 | self.log.debug(' Content: %s\n --->\n ', msg['content']) | |
|
216 | ||||
|
217 | if msg_id in self.aborted: | |||
|
218 | self.aborted.remove(msg_id) | |||
|
219 | # is it safe to assume a msg_id will not be resubmitted? | |||
|
220 | reply_type = msg_type.split('_')[0] + '_reply' | |||
|
221 | status = {'status' : 'aborted'} | |||
|
222 | sub = {'engine' : self.ident} | |||
|
223 | sub.update(status) | |||
|
224 | reply_msg = self.session.send(stream, reply_type, subheader=sub, | |||
|
225 | content=status, parent=msg, ident=idents) | |||
|
226 | return | |||
167 |
|
227 | |||
168 | # Find and call actual handler for message |
|
228 | handler = self.shell_handlers.get(msg_type, None) | |
169 | handler = self.handlers.get(msg_type, None) |
|
|||
170 | if handler is None: |
|
229 | if handler is None: | |
171 |
self.log.error("UNKNOWN MESSAGE TYPE:" |
|
230 | self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type) | |
172 | else: |
|
231 | else: | |
173 | handler(ident, msg) |
|
232 | # ensure default_int_handler during handler call | |
174 |
|
233 | sig = signal(SIGINT, default_int_handler) | ||
175 | # Check whether we should exit, in case the incoming message set the |
|
|||
176 | # exit flag on |
|
|||
177 | if self.shell.exit_now: |
|
|||
178 | self.log.debug('\nExiting IPython kernel...') |
|
|||
179 | # We do a normal, clean exit, which allows any actions registered |
|
|||
180 | # via atexit (such as history saving) to take place. |
|
|||
181 | sys.exit(0) |
|
|||
182 |
|
||||
183 |
|
||||
184 | def start(self): |
|
|||
185 | """ Start the kernel main loop. |
|
|||
186 | """ |
|
|||
187 | # a KeyboardInterrupt (SIGINT) can occur on any python statement, so |
|
|||
188 | # let's ignore (SIG_IGN) them until we're in a place to handle them properly |
|
|||
189 | signal(SIGINT,SIG_IGN) |
|
|||
190 | poller = zmq.Poller() |
|
|||
191 | poller.register(self.shell_socket, zmq.POLLIN) |
|
|||
192 | # loop while self.eventloop has not been overridden |
|
|||
193 | while self.eventloop is None: |
|
|||
194 | try: |
|
|||
195 | # scale by extra factor of 10, because there is no |
|
|||
196 | # reason for this to be anything less than ~ 0.1s |
|
|||
197 | # since it is a real poller and will respond |
|
|||
198 | # to events immediately |
|
|||
199 |
|
||||
200 | # double nested try/except, to properly catch KeyboardInterrupt |
|
|||
201 | # due to pyzmq Issue #130 |
|
|||
202 |
|
|
234 | try: | |
203 | poller.poll(10*1000*self._poll_interval) |
|
235 | handler(stream, idents, msg) | |
204 | # restore raising of KeyboardInterrupt |
|
236 | except Exception: | |
205 | signal(SIGINT, default_int_handler) |
|
237 | self.log.error("Exception in message handler:", exc_info=True) | |
206 | self.do_one_iteration() |
|
|||
207 | except: |
|
|||
208 | raise |
|
|||
209 |
|
|
238 | finally: | |
210 | # prevent raising of KeyboardInterrupt |
|
239 | signal(SIGINT, sig) | |
211 | signal(SIGINT,SIG_IGN) |
|
240 | ||
212 | except KeyboardInterrupt: |
|
241 | def enter_eventloop(self): | |
213 | # Ctrl-C shouldn't crash the kernel |
|
242 | """enter eventloop""" | |
214 | io.raw_print("KeyboardInterrupt caught in kernel") |
|
243 | self.log.info("entering eventloop") | |
215 | # stop ignoring sigint, now that we are out of our own loop, |
|
244 | # restore default_int_handler | |
216 | # we don't want to prevent future code from handling it |
|
|||
217 | signal(SIGINT, default_int_handler) |
|
245 | signal(SIGINT, default_int_handler) | |
218 | while self.eventloop is not None: |
|
246 | while self.eventloop is not None: | |
219 | try: |
|
247 | try: | |
220 | self.eventloop(self) |
|
248 | self.eventloop(self) | |
221 | except KeyboardInterrupt: |
|
249 | except KeyboardInterrupt: | |
222 | # Ctrl-C shouldn't crash the kernel |
|
250 | # Ctrl-C shouldn't crash the kernel | |
223 |
|
|
251 | self.log.error("KeyboardInterrupt caught in kernel") | |
224 | continue |
|
252 | continue | |
225 | else: |
|
253 | else: | |
226 | # eventloop exited cleanly, this means we should stop (right?) |
|
254 | # eventloop exited cleanly, this means we should stop (right?) | |
227 | self.eventloop = None |
|
255 | self.eventloop = None | |
228 | break |
|
256 | break | |
|
257 | self.log.info("exiting eventloop") | |||
|
258 | # if eventloop exits, IOLoop should stop | |||
|
259 | ioloop.IOLoop.instance().stop() | |||
|
260 | ||||
|
261 | def start(self): | |||
|
262 | """register dispatchers for streams""" | |||
|
263 | self.shell.exit_now = False | |||
|
264 | if self.control_stream: | |||
|
265 | self.control_stream.on_recv(self.dispatch_control, copy=False) | |||
|
266 | ||||
|
267 | def make_dispatcher(stream): | |||
|
268 | def dispatcher(msg): | |||
|
269 | return self.dispatch_shell(stream, msg) | |||
|
270 | return dispatcher | |||
|
271 | ||||
|
272 | for s in self.shell_streams: | |||
|
273 | s.on_recv(make_dispatcher(s), copy=False) | |||
|
274 | ||||
|
275 | def do_one_iteration(self): | |||
|
276 | """step eventloop just once""" | |||
|
277 | if self.control_stream: | |||
|
278 | self.control_stream.flush() | |||
|
279 | for stream in self.shell_streams: | |||
|
280 | # handle at most one request per iteration | |||
|
281 | stream.flush(zmq.POLLIN, 1) | |||
|
282 | stream.flush(zmq.POLLOUT) | |||
229 |
|
283 | |||
230 |
|
284 | |||
231 | def record_ports(self, ports): |
|
285 | def record_ports(self, ports): | |
@@ -240,18 +294,30 b' class Kernel(Configurable):' | |||||
240 | # Kernel request handlers |
|
294 | # Kernel request handlers | |
241 | #--------------------------------------------------------------------------- |
|
295 | #--------------------------------------------------------------------------- | |
242 |
|
296 | |||
|
297 | def _make_subheader(self): | |||
|
298 | """init subheader dict, for execute/apply_reply""" | |||
|
299 | return { | |||
|
300 | 'dependencies_met' : True, | |||
|
301 | 'engine' : self.ident, | |||
|
302 | 'started': datetime.now(), | |||
|
303 | } | |||
|
304 | ||||
243 | def _publish_pyin(self, code, parent, execution_count): |
|
305 | def _publish_pyin(self, code, parent, execution_count): | |
244 | """Publish the code request on the pyin stream.""" |
|
306 | """Publish the code request on the pyin stream.""" | |
245 |
|
307 | |||
246 |
self.session.send(self.iopub_socket, u'pyin', |
|
308 | self.session.send(self.iopub_socket, u'pyin', | |
247 |
u'execution_count': execution_count}, |
|
309 | {u'code':code, u'execution_count': execution_count}, | |
|
310 | parent=parent, ident=self._topic('pyin') | |||
|
311 | ) | |||
248 |
|
312 | |||
249 | def execute_request(self, ident, parent): |
|
313 | def execute_request(self, stream, ident, parent): | |
250 |
|
314 | |||
251 | self.session.send(self.iopub_socket, |
|
315 | self.session.send(self.iopub_socket, | |
252 | u'status', |
|
316 | u'status', | |
253 | {u'execution_state':u'busy'}, |
|
317 | {u'execution_state':u'busy'}, | |
254 |
parent=parent |
|
318 | parent=parent, | |
|
319 | ident=self._topic('status'), | |||
|
320 | ) | |||
255 |
|
321 | |||
256 | try: |
|
322 | try: | |
257 | content = parent[u'content'] |
|
323 | content = parent[u'content'] | |
@@ -259,9 +325,11 b' class Kernel(Configurable):' | |||||
259 | silent = content[u'silent'] |
|
325 | silent = content[u'silent'] | |
260 | except: |
|
326 | except: | |
261 | self.log.error("Got bad msg: ") |
|
327 | self.log.error("Got bad msg: ") | |
262 |
self.log.error( |
|
328 | self.log.error("%s", parent) | |
263 | return |
|
329 | return | |
264 |
|
330 | |||
|
331 | sub = self._make_subheader() | |||
|
332 | ||||
265 | shell = self.shell # we'll need this a lot here |
|
333 | shell = self.shell # we'll need this a lot here | |
266 |
|
334 | |||
267 | # Replace raw_input. Note that is not sufficient to replace |
|
335 | # Replace raw_input. Note that is not sufficient to replace | |
@@ -289,14 +357,8 b' class Kernel(Configurable):' | |||||
289 |
|
357 | |||
290 | reply_content = {} |
|
358 | reply_content = {} | |
291 | try: |
|
359 | try: | |
292 | if silent: |
|
|||
293 | # run_code uses 'exec' mode, so no displayhook will fire, and it |
|
|||
294 | # doesn't call logging or history manipulations. Print |
|
|||
295 | # statements in that code will obviously still execute. |
|
|||
296 | shell.run_code(code) |
|
|||
297 | else: |
|
|||
298 |
|
|
360 | # FIXME: the shell calls the exception handler itself. | |
299 |
|
|
361 | shell.run_cell(code, store_history=not silent, silent=silent) | |
300 | except: |
|
362 | except: | |
301 | status = u'error' |
|
363 | status = u'error' | |
302 | # FIXME: this code right now isn't being used yet by default, |
|
364 | # FIXME: this code right now isn't being used yet by default, | |
@@ -327,9 +389,9 b' class Kernel(Configurable):' | |||||
327 | # or not. If it did, we proceed to evaluate user_variables/expressions |
|
389 | # or not. If it did, we proceed to evaluate user_variables/expressions | |
328 | if reply_content['status'] == 'ok': |
|
390 | if reply_content['status'] == 'ok': | |
329 | reply_content[u'user_variables'] = \ |
|
391 | reply_content[u'user_variables'] = \ | |
330 |
shell.user_variables(content |
|
392 | shell.user_variables(content.get(u'user_variables', [])) | |
331 | reply_content[u'user_expressions'] = \ |
|
393 | reply_content[u'user_expressions'] = \ | |
332 |
shell.user_expressions(content |
|
394 | shell.user_expressions(content.get(u'user_expressions', {})) | |
333 | else: |
|
395 | else: | |
334 | # If there was an error, don't even try to compute variables or |
|
396 | # If there was an error, don't even try to compute variables or | |
335 | # expressions |
|
397 | # expressions | |
@@ -355,40 +417,49 b' class Kernel(Configurable):' | |||||
355 |
|
417 | |||
356 | # Send the reply. |
|
418 | # Send the reply. | |
357 | reply_content = json_clean(reply_content) |
|
419 | reply_content = json_clean(reply_content) | |
358 | reply_msg = self.session.send(self.shell_socket, u'execute_reply', |
|
|||
359 | reply_content, parent, ident=ident) |
|
|||
360 | self.log.debug(str(reply_msg)) |
|
|||
361 |
|
420 | |||
362 | if reply_msg['content']['status'] == u'error': |
|
421 | sub['status'] = reply_content['status'] | |
363 | self._abort_queue() |
|
422 | if reply_content['status'] == 'error' and \ | |
|
423 | reply_content['ename'] == 'UnmetDependency': | |||
|
424 | sub['dependencies_met'] = False | |||
|
425 | ||||
|
426 | reply_msg = self.session.send(stream, u'execute_reply', | |||
|
427 | reply_content, parent, subheader=sub, | |||
|
428 | ident=ident) | |||
|
429 | ||||
|
430 | self.log.debug("%s", reply_msg) | |||
|
431 | ||||
|
432 | if not silent and reply_msg['content']['status'] == u'error': | |||
|
433 | self._abort_queues() | |||
364 |
|
434 | |||
365 | self.session.send(self.iopub_socket, |
|
435 | self.session.send(self.iopub_socket, | |
366 | u'status', |
|
436 | u'status', | |
367 | {u'execution_state':u'idle'}, |
|
437 | {u'execution_state':u'idle'}, | |
368 |
parent=parent |
|
438 | parent=parent, | |
|
439 | ident=self._topic('status')) | |||
369 |
|
440 | |||
370 | def complete_request(self, ident, parent): |
|
441 | def complete_request(self, stream, ident, parent): | |
371 | txt, matches = self._complete(parent) |
|
442 | txt, matches = self._complete(parent) | |
372 | matches = {'matches' : matches, |
|
443 | matches = {'matches' : matches, | |
373 | 'matched_text' : txt, |
|
444 | 'matched_text' : txt, | |
374 | 'status' : 'ok'} |
|
445 | 'status' : 'ok'} | |
375 | matches = json_clean(matches) |
|
446 | matches = json_clean(matches) | |
376 |
completion_msg = self.session.send(s |
|
447 | completion_msg = self.session.send(stream, 'complete_reply', | |
377 | matches, parent, ident) |
|
448 | matches, parent, ident) | |
378 |
self.log.debug( |
|
449 | self.log.debug("%s", completion_msg) | |
379 |
|
450 | |||
380 | def object_info_request(self, ident, parent): |
|
451 | def object_info_request(self, stream, ident, parent): | |
381 | content = parent['content'] |
|
452 | content = parent['content'] | |
382 | object_info = self.shell.object_inspect(content['oname'], |
|
453 | object_info = self.shell.object_inspect(content['oname'], | |
383 | detail_level = content.get('detail_level', 0) |
|
454 | detail_level = content.get('detail_level', 0) | |
384 | ) |
|
455 | ) | |
385 | # Before we send this object over, we scrub it for JSON usage |
|
456 | # Before we send this object over, we scrub it for JSON usage | |
386 | oinfo = json_clean(object_info) |
|
457 | oinfo = json_clean(object_info) | |
387 |
msg = self.session.send(s |
|
458 | msg = self.session.send(stream, 'object_info_reply', | |
388 | oinfo, parent, ident) |
|
459 | oinfo, parent, ident) | |
389 | self.log.debug(msg) |
|
460 | self.log.debug("%s", msg) | |
390 |
|
461 | |||
391 | def history_request(self, ident, parent): |
|
462 | def history_request(self, stream, ident, parent): | |
392 | # We need to pull these out, as passing **kwargs doesn't work with |
|
463 | # We need to pull these out, as passing **kwargs doesn't work with | |
393 | # unicode keys before Python 2.6.5. |
|
464 | # unicode keys before Python 2.6.5. | |
394 | hist_access_type = parent['content']['hist_access_type'] |
|
465 | hist_access_type = parent['content']['hist_access_type'] | |
@@ -416,51 +487,183 b' class Kernel(Configurable):' | |||||
416 | hist = list(hist) |
|
487 | hist = list(hist) | |
417 | content = {'history' : hist} |
|
488 | content = {'history' : hist} | |
418 | content = json_clean(content) |
|
489 | content = json_clean(content) | |
419 |
msg = self.session.send(s |
|
490 | msg = self.session.send(stream, 'history_reply', | |
420 | content, parent, ident) |
|
491 | content, parent, ident) | |
421 | self.log.debug("Sending history reply with %i entries", len(hist)) |
|
492 | self.log.debug("Sending history reply with %i entries", len(hist)) | |
422 |
|
493 | |||
423 | def connect_request(self, ident, parent): |
|
494 | def connect_request(self, stream, ident, parent): | |
424 | if self._recorded_ports is not None: |
|
495 | if self._recorded_ports is not None: | |
425 | content = self._recorded_ports.copy() |
|
496 | content = self._recorded_ports.copy() | |
426 | else: |
|
497 | else: | |
427 | content = {} |
|
498 | content = {} | |
428 |
msg = self.session.send(s |
|
499 | msg = self.session.send(stream, 'connect_reply', | |
429 | content, parent, ident) |
|
500 | content, parent, ident) | |
430 | self.log.debug(msg) |
|
501 | self.log.debug("%s", msg) | |
431 |
|
502 | |||
432 | def shutdown_request(self, ident, parent): |
|
503 | def shutdown_request(self, stream, ident, parent): | |
433 | self.shell.exit_now = True |
|
504 | self.shell.exit_now = True | |
|
505 | content = dict(status='ok') | |||
|
506 | content.update(parent['content']) | |||
|
507 | self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) | |||
|
508 | # same content, but different msg_id for broadcasting on IOPub | |||
434 | self._shutdown_message = self.session.msg(u'shutdown_reply', |
|
509 | self._shutdown_message = self.session.msg(u'shutdown_reply', | |
435 |
|
|
510 | content, parent | |
436 |
|
|
511 | ) | |
|
512 | ||||
|
513 | self._at_shutdown() | |||
|
514 | # call sys.exit after a short delay | |||
|
515 | loop = ioloop.IOLoop.instance() | |||
|
516 | loop.add_timeout(time.time()+0.1, loop.stop) | |||
|
517 | ||||
|
518 | #--------------------------------------------------------------------------- | |||
|
519 | # Engine methods | |||
|
520 | #--------------------------------------------------------------------------- | |||
|
521 | ||||
|
522 | def apply_request(self, stream, ident, parent): | |||
|
523 | try: | |||
|
524 | content = parent[u'content'] | |||
|
525 | bufs = parent[u'buffers'] | |||
|
526 | msg_id = parent['header']['msg_id'] | |||
|
527 | except: | |||
|
528 | self.log.error("Got bad msg: %s", parent, exc_info=True) | |||
|
529 | return | |||
|
530 | ||||
|
531 | # Set the parent message of the display hook and out streams. | |||
|
532 | self.shell.displayhook.set_parent(parent) | |||
|
533 | self.shell.display_pub.set_parent(parent) | |||
|
534 | sys.stdout.set_parent(parent) | |||
|
535 | sys.stderr.set_parent(parent) | |||
|
536 | ||||
|
537 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |||
|
538 | # self.iopub_socket.send(pyin_msg) | |||
|
539 | # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent) | |||
|
540 | sub = self._make_subheader() | |||
|
541 | try: | |||
|
542 | working = self.shell.user_ns | |||
|
543 | ||||
|
544 | prefix = "_"+str(msg_id).replace("-","")+"_" | |||
|
545 | ||||
|
546 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | |||
|
547 | ||||
|
548 | fname = getattr(f, '__name__', 'f') | |||
|
549 | ||||
|
550 | fname = prefix+"f" | |||
|
551 | argname = prefix+"args" | |||
|
552 | kwargname = prefix+"kwargs" | |||
|
553 | resultname = prefix+"result" | |||
|
554 | ||||
|
555 | ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } | |||
|
556 | # print ns | |||
|
557 | working.update(ns) | |||
|
558 | code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname) | |||
|
559 | try: | |||
|
560 | exec code in self.shell.user_global_ns, self.shell.user_ns | |||
|
561 | result = working.get(resultname) | |||
|
562 | finally: | |||
|
563 | for key in ns.iterkeys(): | |||
|
564 | working.pop(key) | |||
|
565 | ||||
|
566 | packed_result,buf = serialize_object(result) | |||
|
567 | result_buf = [packed_result]+buf | |||
|
568 | except: | |||
|
569 | exc_content = self._wrap_exception('apply') | |||
|
570 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |||
|
571 | self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent, | |||
|
572 | ident=self._topic('pyerr')) | |||
|
573 | reply_content = exc_content | |||
|
574 | result_buf = [] | |||
|
575 | ||||
|
576 | if exc_content['ename'] == 'UnmetDependency': | |||
|
577 | sub['dependencies_met'] = False | |||
|
578 | else: | |||
|
579 | reply_content = {'status' : 'ok'} | |||
|
580 | ||||
|
581 | # put 'ok'/'error' status in header, for scheduler introspection: | |||
|
582 | sub['status'] = reply_content['status'] | |||
|
583 | ||||
|
584 | # flush i/o | |||
|
585 | sys.stdout.flush() | |||
|
586 | sys.stderr.flush() | |||
|
587 | ||||
|
588 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | |||
|
589 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) | |||
|
590 | ||||
|
591 | #--------------------------------------------------------------------------- | |||
|
592 | # Control messages | |||
|
593 | #--------------------------------------------------------------------------- | |||
|
594 | ||||
|
595 | def abort_request(self, stream, ident, parent): | |||
|
596 | """abort a specifig msg by id""" | |||
|
597 | msg_ids = parent['content'].get('msg_ids', None) | |||
|
598 | if isinstance(msg_ids, basestring): | |||
|
599 | msg_ids = [msg_ids] | |||
|
600 | if not msg_ids: | |||
|
601 | self.abort_queues() | |||
|
602 | for mid in msg_ids: | |||
|
603 | self.aborted.add(str(mid)) | |||
|
604 | ||||
|
605 | content = dict(status='ok') | |||
|
606 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | |||
|
607 | parent=parent, ident=ident) | |||
|
608 | self.log.debug("%s", reply_msg) | |||
|
609 | ||||
|
610 | def clear_request(self, stream, idents, parent): | |||
|
611 | """Clear our namespace.""" | |||
|
612 | self.shell.reset(False) | |||
|
613 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | |||
|
614 | content = dict(status='ok')) | |||
|
615 | ||||
437 |
|
616 | |||
438 | #--------------------------------------------------------------------------- |
|
617 | #--------------------------------------------------------------------------- | |
439 | # Protected interface |
|
618 | # Protected interface | |
440 | #--------------------------------------------------------------------------- |
|
619 | #--------------------------------------------------------------------------- | |
441 |
|
620 | |||
442 | def _abort_queue(self): |
|
621 | ||
|
622 | def _wrap_exception(self, method=None): | |||
|
623 | # import here, because _wrap_exception is only used in parallel, | |||
|
624 | # and parallel has higher min pyzmq version | |||
|
625 | from IPython.parallel.error import wrap_exception | |||
|
626 | e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) | |||
|
627 | content = wrap_exception(e_info) | |||
|
628 | return content | |||
|
629 | ||||
|
630 | def _topic(self, topic): | |||
|
631 | """prefixed topic for IOPub messages""" | |||
|
632 | if self.int_id >= 0: | |||
|
633 | base = "engine.%i" % self.int_id | |||
|
634 | else: | |||
|
635 | base = "kernel.%s" % self.ident | |||
|
636 | ||||
|
637 | return py3compat.cast_bytes("%s.%s" % (base, topic)) | |||
|
638 | ||||
|
639 | def _abort_queues(self): | |||
|
640 | for stream in self.shell_streams: | |||
|
641 | if stream: | |||
|
642 | self._abort_queue(stream) | |||
|
643 | ||||
|
644 | def _abort_queue(self, stream): | |||
|
645 | poller = zmq.Poller() | |||
|
646 | poller.register(stream.socket, zmq.POLLIN) | |||
443 | while True: |
|
647 | while True: | |
444 | try: |
|
648 | idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) | |
445 | ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK) |
|
|||
446 | except Exception: |
|
|||
447 | self.log.warn("Invalid Message:", exc_info=True) |
|
|||
448 | continue |
|
|||
449 | if msg is None: |
|
649 | if msg is None: | |
450 |
|
|
650 | return | |
451 | else: |
|
|||
452 | assert ident is not None, \ |
|
|||
453 | "Unexpected missing message part." |
|
|||
454 |
|
651 | |||
455 |
self.log. |
|
652 | self.log.info("Aborting:") | |
|
653 | self.log.info("%s", msg) | |||
456 | msg_type = msg['header']['msg_type'] |
|
654 | msg_type = msg['header']['msg_type'] | |
457 | reply_type = msg_type.split('_')[0] + '_reply' |
|
655 | reply_type = msg_type.split('_')[0] + '_reply' | |
458 | reply_msg = self.session.send(self.shell_socket, reply_type, |
|
656 | ||
459 |
|
|
657 | status = {'status' : 'aborted'} | |
460 | self.log.debug(reply_msg) |
|
658 | sub = {'engine' : self.ident} | |
|
659 | sub.update(status) | |||
|
660 | reply_msg = self.session.send(stream, reply_type, subheader=sub, | |||
|
661 | content=status, parent=msg, ident=idents) | |||
|
662 | self.log.debug("%s", reply_msg) | |||
461 | # We need to wait a bit for requests to come in. This can probably |
|
663 | # We need to wait a bit for requests to come in. This can probably | |
462 | # be set shorter for true asynchronous clients. |
|
664 | # be set shorter for true asynchronous clients. | |
463 |
|
|
665 | poller.poll(50) | |
|
666 | ||||
464 |
|
667 | |||
465 | def _no_raw_input(self): |
|
668 | def _no_raw_input(self): | |
466 | """Raise StdinNotImplentedError if active frontend doesn't support |
|
669 | """Raise StdinNotImplentedError if active frontend doesn't support | |
@@ -490,7 +693,7 b' class Kernel(Configurable):' | |||||
490 | value = reply['content']['value'] |
|
693 | value = reply['content']['value'] | |
491 | except: |
|
694 | except: | |
492 | self.log.error("Got bad raw_input reply: ") |
|
695 | self.log.error("Got bad raw_input reply: ") | |
493 |
self.log.error( |
|
696 | self.log.error("%s", parent) | |
494 | value = '' |
|
697 | value = '' | |
495 | if value == '\x04': |
|
698 | if value == '\x04': | |
496 | # EOF |
|
699 | # EOF | |
@@ -545,12 +748,9 b' class Kernel(Configurable):' | |||||
545 | """ |
|
748 | """ | |
546 | # io.rprint("Kernel at_shutdown") # dbg |
|
749 | # io.rprint("Kernel at_shutdown") # dbg | |
547 | if self._shutdown_message is not None: |
|
750 | if self._shutdown_message is not None: | |
548 |
self.session.send(self. |
|
751 | self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) | |
549 |
self. |
|
752 | self.log.debug("%s", self._shutdown_message) | |
550 | self.log.debug(str(self._shutdown_message)) |
|
753 | [ s.flush(zmq.POLLOUT) for s in self.shell_streams ] | |
551 | # A very short sleep to give zmq time to flush its message buffers |
|
|||
552 | # before Python truly shuts down. |
|
|||
553 | time.sleep(0.01) |
|
|||
554 |
|
754 | |||
555 | #----------------------------------------------------------------------------- |
|
755 | #----------------------------------------------------------------------------- | |
556 | # Aliases and Flags for the IPKernelApp |
|
756 | # Aliases and Flags for the IPKernelApp | |
@@ -604,8 +804,10 b' class IPKernelApp(KernelApp, InteractiveShellApp):' | |||||
604 |
|
804 | |||
605 | def init_kernel(self): |
|
805 | def init_kernel(self): | |
606 |
|
806 | |||
|
807 | shell_stream = ZMQStream(self.shell_socket) | |||
|
808 | ||||
607 | kernel = Kernel(config=self.config, session=self.session, |
|
809 | kernel = Kernel(config=self.config, session=self.session, | |
608 |
shell_s |
|
810 | shell_streams=[shell_stream], | |
609 | iopub_socket=self.iopub_socket, |
|
811 | iopub_socket=self.iopub_socket, | |
610 | stdin_socket=self.stdin_socket, |
|
812 | stdin_socket=self.stdin_socket, | |
611 | log=self.log, |
|
813 | log=self.log, | |
@@ -687,6 +889,13 b' def embed_kernel(module=None, local_ns=None, **kwargs):' | |||||
687 | else: |
|
889 | else: | |
688 | app = IPKernelApp.instance(**kwargs) |
|
890 | app = IPKernelApp.instance(**kwargs) | |
689 | app.initialize([]) |
|
891 | app.initialize([]) | |
|
892 | # Undo unnecessary sys module mangling from init_sys_modules. | |||
|
893 | # This would not be necessary if we could prevent it | |||
|
894 | # in the first place by using a different InteractiveShell | |||
|
895 | # subclass, as in the regular embed case. | |||
|
896 | main = app.kernel.shell._orig_sys_modules_main_mod | |||
|
897 | if main is not None: | |||
|
898 | sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main | |||
690 |
|
899 | |||
691 | # load the calling scope if not given |
|
900 | # load the calling scope if not given | |
692 | (caller_module, caller_locals) = extract_module_locals(1) |
|
901 | (caller_module, caller_locals) = extract_module_locals(1) |
@@ -15,15 +15,18 b' Authors' | |||||
15 | # Imports |
|
15 | # Imports | |
16 | #----------------------------------------------------------------------------- |
|
16 | #----------------------------------------------------------------------------- | |
17 |
|
17 | |||
18 |
# Standard library imports |
|
18 | # Standard library imports | |
|
19 | import atexit | |||
19 | import json |
|
20 | import json | |
20 | import os |
|
21 | import os | |
21 | import sys |
|
22 | import sys | |
|
23 | import signal | |||
22 |
|
24 | |||
23 |
# System library imports |
|
25 | # System library imports | |
24 | import zmq |
|
26 | import zmq | |
|
27 | from zmq.eventloop import ioloop | |||
25 |
|
28 | |||
26 |
# IPython imports |
|
29 | # IPython imports | |
27 | from IPython.core.ultratb import FormattedTB |
|
30 | from IPython.core.ultratb import FormattedTB | |
28 | from IPython.core.application import ( |
|
31 | from IPython.core.application import ( | |
29 | BaseIPythonApplication, base_flags, base_aliases, catch_config_error |
|
32 | BaseIPythonApplication, base_flags, base_aliases, catch_config_error | |
@@ -82,17 +85,18 b' kernel_flags.update(session_flags)' | |||||
82 | #----------------------------------------------------------------------------- |
|
85 | #----------------------------------------------------------------------------- | |
83 |
|
86 | |||
84 | class KernelApp(BaseIPythonApplication): |
|
87 | class KernelApp(BaseIPythonApplication): | |
85 |
name='p |
|
88 | name='ipkernel' | |
86 | aliases = Dict(kernel_aliases) |
|
89 | aliases = Dict(kernel_aliases) | |
87 | flags = Dict(kernel_flags) |
|
90 | flags = Dict(kernel_flags) | |
88 | classes = [Session] |
|
91 | classes = [Session] | |
89 | # the kernel class, as an importstring |
|
92 | # the kernel class, as an importstring | |
90 |
kernel_class = DottedObjectName('IPython.zmq.p |
|
93 | kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel') | |
91 | kernel = Any() |
|
94 | kernel = Any() | |
92 | poller = Any() # don't restrict this even though current pollers are all Threads |
|
95 | poller = Any() # don't restrict this even though current pollers are all Threads | |
93 | heartbeat = Instance(Heartbeat) |
|
96 | heartbeat = Instance(Heartbeat) | |
94 | session = Instance('IPython.zmq.session.Session') |
|
97 | session = Instance('IPython.zmq.session.Session') | |
95 | ports = Dict() |
|
98 | ports = Dict() | |
|
99 | _full_connection_file = Unicode() | |||
96 |
|
100 | |||
97 | # inherit config file name from parent: |
|
101 | # inherit config file name from parent: | |
98 | parent_appname = Unicode(config=True) |
|
102 | parent_appname = Unicode(config=True) | |
@@ -163,6 +167,8 b' class KernelApp(BaseIPythonApplication):' | |||||
163 | fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir]) |
|
167 | fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir]) | |
164 | except IOError: |
|
168 | except IOError: | |
165 | self.log.debug("Connection file not found: %s", self.connection_file) |
|
169 | self.log.debug("Connection file not found: %s", self.connection_file) | |
|
170 | # This means I own it, so I will clean it up: | |||
|
171 | atexit.register(self.cleanup_connection_file) | |||
166 | return |
|
172 | return | |
167 | self.log.debug(u"Loading connection file %s", fname) |
|
173 | self.log.debug(u"Loading connection file %s", fname) | |
168 | with open(fname) as f: |
|
174 | with open(fname) as f: | |
@@ -189,6 +195,16 b' class KernelApp(BaseIPythonApplication):' | |||||
189 | shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port, |
|
195 | shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port, | |
190 | iopub_port=self.iopub_port) |
|
196 | iopub_port=self.iopub_port) | |
191 |
|
197 | |||
|
198 | self._full_connection_file = cf | |||
|
199 | ||||
|
200 | def cleanup_connection_file(self): | |||
|
201 | cf = self._full_connection_file | |||
|
202 | self.log.debug("cleaning up connection file: %r", cf) | |||
|
203 | try: | |||
|
204 | os.remove(cf) | |||
|
205 | except (IOError, OSError): | |||
|
206 | pass | |||
|
207 | ||||
192 | def init_connection_file(self): |
|
208 | def init_connection_file(self): | |
193 | if not self.connection_file: |
|
209 | if not self.connection_file: | |
194 | self.connection_file = "kernel-%s.json"%os.getpid() |
|
210 | self.connection_file = "kernel-%s.json"%os.getpid() | |
@@ -217,17 +233,22 b' class KernelApp(BaseIPythonApplication):' | |||||
217 | self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) |
|
233 | self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) | |
218 | self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port) |
|
234 | self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port) | |
219 |
|
|
235 | ||
|
236 | def init_heartbeat(self): | |||
|
237 | """start the heart beating""" | |||
220 | # heartbeat doesn't share context, because it mustn't be blocked |
|
238 | # heartbeat doesn't share context, because it mustn't be blocked | |
221 | # by the GIL, which is accessed by libzmq when freeing zero-copy messages |
|
239 | # by the GIL, which is accessed by libzmq when freeing zero-copy messages | |
222 | hb_ctx = zmq.Context() |
|
240 | hb_ctx = zmq.Context() | |
223 | self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port)) |
|
241 | self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port)) | |
224 | self.hb_port = self.heartbeat.port |
|
242 | self.hb_port = self.heartbeat.port | |
225 | self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port) |
|
243 | self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port) | |
|
244 | self.heartbeat.start() | |||
226 |
|
245 | |||
227 | # Helper to make it easier to connect to an existing kernel. |
|
246 | # Helper to make it easier to connect to an existing kernel. | |
228 | # set log-level to critical, to make sure it is output |
|
247 | # set log-level to critical, to make sure it is output | |
229 | self.log.critical("To connect another client to this kernel, use:") |
|
248 | self.log.critical("To connect another client to this kernel, use:") | |
230 |
|
|
249 | ||
|
250 | def log_connection_info(self): | |||
|
251 | """display connection info, and store ports""" | |||
231 | basename = os.path.basename(self.connection_file) |
|
252 | basename = os.path.basename(self.connection_file) | |
232 | if basename == self.connection_file or \ |
|
253 | if basename == self.connection_file or \ | |
233 | os.path.dirname(self.connection_file) == self.profile_dir.security_dir: |
|
254 | os.path.dirname(self.connection_file) == self.profile_dir.security_dir: | |
@@ -267,6 +288,9 b' class KernelApp(BaseIPythonApplication):' | |||||
267 | displayhook_factory = import_item(str(self.displayhook_class)) |
|
288 | displayhook_factory = import_item(str(self.displayhook_class)) | |
268 | sys.displayhook = displayhook_factory(self.session, self.iopub_socket) |
|
289 | sys.displayhook = displayhook_factory(self.session, self.iopub_socket) | |
269 |
|
290 | |||
|
291 | def init_signal(self): | |||
|
292 | signal.signal(signal.SIGINT, signal.SIG_IGN) | |||
|
293 | ||||
270 | def init_kernel(self): |
|
294 | def init_kernel(self): | |
271 | """Create the Kernel object itself""" |
|
295 | """Create the Kernel object itself""" | |
272 | kernel_factory = import_item(str(self.kernel_class)) |
|
296 | kernel_factory = import_item(str(self.kernel_class)) | |
@@ -286,9 +310,12 b' class KernelApp(BaseIPythonApplication):' | |||||
286 | self.init_session() |
|
310 | self.init_session() | |
287 | self.init_poller() |
|
311 | self.init_poller() | |
288 | self.init_sockets() |
|
312 | self.init_sockets() | |
289 | # writing connection file must be *after* init_sockets |
|
313 | self.init_heartbeat() | |
|
314 | # writing/displaying connection info must be *after* init_sockets/heartbeat | |||
|
315 | self.log_connection_info() | |||
290 | self.write_connection_file() |
|
316 | self.write_connection_file() | |
291 | self.init_io() |
|
317 | self.init_io() | |
|
318 | self.init_signal() | |||
292 | self.init_kernel() |
|
319 | self.init_kernel() | |
293 | # flush stdout/stderr, so that anything written to these streams during |
|
320 | # flush stdout/stderr, so that anything written to these streams during | |
294 | # initialization do not get associated with the first execution request |
|
321 | # initialization do not get associated with the first execution request | |
@@ -296,11 +323,11 b' class KernelApp(BaseIPythonApplication):' | |||||
296 | sys.stderr.flush() |
|
323 | sys.stderr.flush() | |
297 |
|
324 | |||
298 | def start(self): |
|
325 | def start(self): | |
299 | self.heartbeat.start() |
|
|||
300 | if self.poller is not None: |
|
326 | if self.poller is not None: | |
301 | self.poller.start() |
|
327 | self.poller.start() | |
302 | try: |
|
|||
303 |
|
|
328 | self.kernel.start() | |
|
329 | try: | |||
|
330 | ioloop.IOLoop.instance().start() | |||
304 | except KeyboardInterrupt: |
|
331 | except KeyboardInterrupt: | |
305 | pass |
|
332 | pass | |
306 |
|
333 |
@@ -781,9 +781,6 b' class KernelManager(HasTraits):' | |||||
781 |
|
781 | |||
782 | Parameters: |
|
782 | Parameters: | |
783 | ----------- |
|
783 | ----------- | |
784 | ipython : bool, optional (default True) |
|
|||
785 | Whether to use an IPython kernel instead of a plain Python kernel. |
|
|||
786 |
|
||||
787 | launcher : callable, optional (default None) |
|
784 | launcher : callable, optional (default None) | |
788 | A custom function for launching the kernel process (generally a |
|
785 | A custom function for launching the kernel process (generally a | |
789 | wrapper around ``entry_point.base_launch_kernel``). In most cases, |
|
786 | wrapper around ``entry_point.base_launch_kernel``). In most cases, | |
@@ -805,10 +802,7 b' class KernelManager(HasTraits):' | |||||
805 | self._launch_args = kw.copy() |
|
802 | self._launch_args = kw.copy() | |
806 | launch_kernel = kw.pop('launcher', None) |
|
803 | launch_kernel = kw.pop('launcher', None) | |
807 | if launch_kernel is None: |
|
804 | if launch_kernel is None: | |
808 | if kw.pop('ipython', True): |
|
|||
809 |
|
|
805 | from ipkernel import launch_kernel | |
810 | else: |
|
|||
811 | from pykernel import launch_kernel |
|
|||
812 | self.kernel = launch_kernel(fname=self.connection_file, **kw) |
|
806 | self.kernel = launch_kernel(fname=self.connection_file, **kw) | |
813 |
|
807 | |||
814 | def shutdown_kernel(self, restart=False): |
|
808 | def shutdown_kernel(self, restart=False): |
@@ -629,7 +629,7 b' class Session(Configurable):' | |||||
629 | if isinstance(socket, ZMQStream): |
|
629 | if isinstance(socket, ZMQStream): | |
630 | socket = socket.socket |
|
630 | socket = socket.socket | |
631 | try: |
|
631 | try: | |
632 | msg_list = socket.recv_multipart(mode) |
|
632 | msg_list = socket.recv_multipart(mode, copy=copy) | |
633 | except zmq.ZMQError as e: |
|
633 | except zmq.ZMQError as e: | |
634 | if e.errno == zmq.EAGAIN: |
|
634 | if e.errno == zmq.EAGAIN: | |
635 | # We can convert EAGAIN to None as we know in this case |
|
635 | # We can convert EAGAIN to None as we know in this case |
@@ -23,7 +23,7 b' from subprocess import Popen, PIPE' | |||||
23 | import nose.tools as nt |
|
23 | import nose.tools as nt | |
24 |
|
24 | |||
25 | from IPython.zmq.blockingkernelmanager import BlockingKernelManager |
|
25 | from IPython.zmq.blockingkernelmanager import BlockingKernelManager | |
26 | from IPython.utils import path |
|
26 | from IPython.utils import path, py3compat | |
27 |
|
27 | |||
28 |
|
28 | |||
29 | #------------------------------------------------------------------------------- |
|
29 | #------------------------------------------------------------------------------- | |
@@ -68,17 +68,19 b' def setup_kernel(cmd):' | |||||
68 | ) |
|
68 | ) | |
69 | # wait for connection file to exist, timeout after 5s |
|
69 | # wait for connection file to exist, timeout after 5s | |
70 | tic = time.time() |
|
70 | tic = time.time() | |
71 |
while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + |
|
71 | while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10: | |
72 | time.sleep(0.1) |
|
72 | time.sleep(0.1) | |
73 |
|
73 | |||
|
74 | if kernel.poll() is not None: | |||
|
75 | o,e = kernel.communicate() | |||
|
76 | e = py3compat.cast_unicode(e) | |||
|
77 | raise IOError("Kernel failed to start:\n%s" % e) | |||
|
78 | ||||
74 | if not os.path.exists(connection_file): |
|
79 | if not os.path.exists(connection_file): | |
75 | if kernel.poll() is None: |
|
80 | if kernel.poll() is None: | |
76 | kernel.terminate() |
|
81 | kernel.terminate() | |
77 | raise IOError("Connection file %r never arrived" % connection_file) |
|
82 | raise IOError("Connection file %r never arrived" % connection_file) | |
78 |
|
83 | |||
79 | if kernel.poll() is not None: |
|
|||
80 | raise IOError("Kernel failed to start") |
|
|||
81 |
|
||||
82 | km = BlockingKernelManager(connection_file=connection_file) |
|
84 | km = BlockingKernelManager(connection_file=connection_file) | |
83 | km.load_connection_file() |
|
85 | km.load_connection_file() | |
84 | km.start_channels() |
|
86 | km.start_channels() | |
@@ -157,3 +159,33 b' def test_embed_kernel_namespace():' | |||||
157 | content = msg['content'] |
|
159 | content = msg['content'] | |
158 | nt.assert_false(content['found']) |
|
160 | nt.assert_false(content['found']) | |
159 |
|
161 | |||
|
162 | def test_embed_kernel_reentrant(): | |||
|
163 | """IPython.embed_kernel() can be called multiple times""" | |||
|
164 | cmd = '\n'.join([ | |||
|
165 | 'from IPython import embed_kernel', | |||
|
166 | 'count = 0', | |||
|
167 | 'def go():', | |||
|
168 | ' global count', | |||
|
169 | ' embed_kernel()', | |||
|
170 | ' count = count + 1', | |||
|
171 | '', | |||
|
172 | 'while True:' | |||
|
173 | ' go()', | |||
|
174 | '', | |||
|
175 | ]) | |||
|
176 | ||||
|
177 | with setup_kernel(cmd) as km: | |||
|
178 | shell = km.shell_channel | |||
|
179 | for i in range(5): | |||
|
180 | msg_id = shell.object_info('count') | |||
|
181 | msg = shell.get_msg(block=True, timeout=2) | |||
|
182 | content = msg['content'] | |||
|
183 | nt.assert_true(content['found']) | |||
|
184 | nt.assert_equals(content['string_form'], unicode(i)) | |||
|
185 | ||||
|
186 | # exit from embed_kernel | |||
|
187 | shell.execute("get_ipython().exit_now = True") | |||
|
188 | msg = shell.get_msg(block=True, timeout=2) | |||
|
189 | time.sleep(0.2) | |||
|
190 | ||||
|
191 |
@@ -19,8 +19,12 b' from __future__ import print_function' | |||||
19 | import inspect |
|
19 | import inspect | |
20 | import os |
|
20 | import os | |
21 | import sys |
|
21 | import sys | |
|
22 | import time | |||
22 | from subprocess import Popen, PIPE |
|
23 | from subprocess import Popen, PIPE | |
23 |
|
24 | |||
|
25 | # System library imports | |||
|
26 | from zmq.eventloop import ioloop | |||
|
27 | ||||
24 | # Our own |
|
28 | # Our own | |
25 | from IPython.core.interactiveshell import ( |
|
29 | from IPython.core.interactiveshell import ( | |
26 | InteractiveShell, InteractiveShellABC |
|
30 | InteractiveShell, InteractiveShellABC | |
@@ -39,7 +43,7 b' from IPython.utils import io' | |||||
39 | from IPython.utils.jsonutil import json_clean |
|
43 | from IPython.utils.jsonutil import json_clean | |
40 | from IPython.utils.path import get_py_filename |
|
44 | from IPython.utils.path import get_py_filename | |
41 | from IPython.utils.process import arg_split |
|
45 | from IPython.utils.process import arg_split | |
42 | from IPython.utils.traitlets import Instance, Type, Dict, CBool |
|
46 | from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes | |
43 | from IPython.utils.warn import warn, error |
|
47 | from IPython.utils.warn import warn, error | |
44 | from IPython.zmq.displayhook import ZMQShellDisplayHook, _encode_binary |
|
48 | from IPython.zmq.displayhook import ZMQShellDisplayHook, _encode_binary | |
45 | from IPython.zmq.session import extract_header |
|
49 | from IPython.zmq.session import extract_header | |
@@ -56,6 +60,7 b' class ZMQDisplayPublisher(DisplayPublisher):' | |||||
56 | session = Instance(Session) |
|
60 | session = Instance(Session) | |
57 | pub_socket = Instance('zmq.Socket') |
|
61 | pub_socket = Instance('zmq.Socket') | |
58 | parent_header = Dict({}) |
|
62 | parent_header = Dict({}) | |
|
63 | topic = CBytes(b'displaypub') | |||
59 |
|
64 | |||
60 | def set_parent(self, parent): |
|
65 | def set_parent(self, parent): | |
61 | """Set the parent for outbound messages.""" |
|
66 | """Set the parent for outbound messages.""" | |
@@ -78,7 +83,7 b' class ZMQDisplayPublisher(DisplayPublisher):' | |||||
78 | content['metadata'] = metadata |
|
83 | content['metadata'] = metadata | |
79 | self.session.send( |
|
84 | self.session.send( | |
80 | self.pub_socket, u'display_data', json_clean(content), |
|
85 | self.pub_socket, u'display_data', json_clean(content), | |
81 | parent=self.parent_header |
|
86 | parent=self.parent_header, ident=self.topic, | |
82 | ) |
|
87 | ) | |
83 |
|
88 | |||
84 | def clear_output(self, stdout=True, stderr=True, other=True): |
|
89 | def clear_output(self, stdout=True, stderr=True, other=True): | |
@@ -93,7 +98,7 b' class ZMQDisplayPublisher(DisplayPublisher):' | |||||
93 |
|
98 | |||
94 | self.session.send( |
|
99 | self.session.send( | |
95 | self.pub_socket, u'clear_output', content, |
|
100 | self.pub_socket, u'clear_output', content, | |
96 | parent=self.parent_header |
|
101 | parent=self.parent_header, ident=self.topic, | |
97 | ) |
|
102 | ) | |
98 |
|
103 | |||
99 | class ZMQInteractiveShell(InteractiveShell): |
|
104 | class ZMQInteractiveShell(InteractiveShell): | |
@@ -115,6 +120,12 b' class ZMQInteractiveShell(InteractiveShell):' | |||||
115 | def _exiter_default(self): |
|
120 | def _exiter_default(self): | |
116 | return ZMQExitAutocall(self) |
|
121 | return ZMQExitAutocall(self) | |
117 |
|
122 | |||
|
123 | def _exit_now_changed(self, name, old, new): | |||
|
124 | """stop eventloop when exit_now fires""" | |||
|
125 | if new: | |||
|
126 | loop = ioloop.IOLoop.instance() | |||
|
127 | loop.add_timeout(time.time()+0.1, loop.stop) | |||
|
128 | ||||
118 | keepkernel_on_exit = None |
|
129 | keepkernel_on_exit = None | |
119 |
|
130 | |||
120 | # Over ZeroMQ, GUI control isn't done with PyOS_InputHook as there is no |
|
131 | # Over ZeroMQ, GUI control isn't done with PyOS_InputHook as there is no | |
@@ -154,6 +165,7 b' class ZMQInteractiveShell(InteractiveShell):' | |||||
154 |
|
165 | |||
155 | def ask_exit(self): |
|
166 | def ask_exit(self): | |
156 | """Engage the exit actions.""" |
|
167 | """Engage the exit actions.""" | |
|
168 | self.exit_now = True | |||
157 | payload = dict( |
|
169 | payload = dict( | |
158 | source='IPython.zmq.zmqshell.ZMQInteractiveShell.ask_exit', |
|
170 | source='IPython.zmq.zmqshell.ZMQInteractiveShell.ask_exit', | |
159 | exit=True, |
|
171 | exit=True, | |
@@ -172,7 +184,11 b' class ZMQInteractiveShell(InteractiveShell):' | |||||
172 | dh = self.displayhook |
|
184 | dh = self.displayhook | |
173 | # Send exception info over pub socket for other clients than the caller |
|
185 | # Send exception info over pub socket for other clients than the caller | |
174 | # to pick up |
|
186 | # to pick up | |
175 | exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header) |
|
187 | topic = None | |
|
188 | if dh.topic: | |||
|
189 | topic = dh.topic.replace(b'pyout', b'pyerr') | |||
|
190 | ||||
|
191 | exc_msg = dh.session.send(dh.pub_socket, u'pyerr', json_clean(exc_content), dh.parent_header, ident=topic) | |||
176 |
|
192 | |||
177 | # FIXME - Hack: store exception info in shell object. Right now, the |
|
193 | # FIXME - Hack: store exception info in shell object. Right now, the | |
178 | # caller is reading this info after the fact, we need to fix this logic |
|
194 | # caller is reading this info after the fact, we need to fix this logic |
@@ -34,7 +34,7 b' TaskRecord keys:' | |||||
34 | =============== =============== ============= |
|
34 | =============== =============== ============= | |
35 | Key Type Description |
|
35 | Key Type Description | |
36 | =============== =============== ============= |
|
36 | =============== =============== ============= | |
37 |
msg_id uuid( |
|
37 | msg_id uuid(ascii) The msg ID | |
38 | header dict The request header |
|
38 | header dict The request header | |
39 | content dict The request content (likely empty) |
|
39 | content dict The request content (likely empty) | |
40 | buffers list(bytes) buffers containing serialized request objects |
|
40 | buffers list(bytes) buffers containing serialized request objects | |
@@ -43,7 +43,7 b" client_uuid uuid(bytes) IDENT of client's socket" | |||||
43 | engine_uuid uuid(bytes) IDENT of engine's socket |
|
43 | engine_uuid uuid(bytes) IDENT of engine's socket | |
44 | started datetime time task began execution on engine |
|
44 | started datetime time task began execution on engine | |
45 | completed datetime time task finished execution (success or failure) on engine |
|
45 | completed datetime time task finished execution (success or failure) on engine | |
46 |
resubmitted |
|
46 | resubmitted uuid(ascii) msg_id of resubmitted task (if applicable) | |
47 | result_header dict header for result |
|
47 | result_header dict header for result | |
48 | result_content dict content for result |
|
48 | result_content dict content for result | |
49 | result_buffers list(bytes) buffers containing serialized request objects |
|
49 | result_buffers list(bytes) buffers containing serialized request objects |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now