Show More
@@ -0,0 +1,71 b'' | |||||
|
1 | """Publishing | |||
|
2 | """ | |||
|
3 | ||||
|
4 | #----------------------------------------------------------------------------- | |||
|
5 | # Copyright (C) 2012 The IPython Development Team | |||
|
6 | # | |||
|
7 | # Distributed under the terms of the BSD License. The full license is in | |||
|
8 | # the file COPYING, distributed as part of this software. | |||
|
9 | #----------------------------------------------------------------------------- | |||
|
10 | ||||
|
11 | #----------------------------------------------------------------------------- | |||
|
12 | # Imports | |||
|
13 | #----------------------------------------------------------------------------- | |||
|
14 | ||||
|
15 | from IPython.config import Configurable | |||
|
16 | ||||
|
17 | from IPython.utils.jsonutil import json_clean | |||
|
18 | from IPython.utils.traitlets import Instance, Dict, CBytes | |||
|
19 | ||||
|
20 | from IPython.zmq.serialize import serialize_object | |||
|
21 | from IPython.zmq.session import Session, extract_header | |||
|
22 | ||||
|
23 | #----------------------------------------------------------------------------- | |||
|
24 | # Code | |||
|
25 | #----------------------------------------------------------------------------- | |||
|
26 | ||||
|
27 | ||||
|
28 | class ZMQDataPublisher(Configurable): | |||
|
29 | ||||
|
30 | topic = topic = CBytes(b'datapub') | |||
|
31 | session = Instance(Session) | |||
|
32 | pub_socket = Instance('zmq.Socket') | |||
|
33 | parent_header = Dict({}) | |||
|
34 | ||||
|
35 | def set_parent(self, parent): | |||
|
36 | """Set the parent for outbound messages.""" | |||
|
37 | self.parent_header = extract_header(parent) | |||
|
38 | ||||
|
39 | def publish_data(self, data): | |||
|
40 | """publish a data_message on the IOPub channel | |||
|
41 | ||||
|
42 | Parameters | |||
|
43 | ---------- | |||
|
44 | ||||
|
45 | data : dict | |||
|
46 | The data to be published. Think of it as a namespace. | |||
|
47 | """ | |||
|
48 | session = self.session | |||
|
49 | buffers = serialize_object(data, | |||
|
50 | buffer_threshold=session.buffer_threshold, | |||
|
51 | item_threshold=session.item_threshold, | |||
|
52 | ) | |||
|
53 | content = json_clean(dict(keys=data.keys())) | |||
|
54 | session.send(self.pub_socket, 'data_message', content=content, | |||
|
55 | parent=self.parent_header, | |||
|
56 | buffers=buffers, | |||
|
57 | ident=self.topic, | |||
|
58 | ) | |||
|
59 | ||||
|
60 | ||||
|
61 | def publish_data(data): | |||
|
62 | """publish a data_message on the IOPub channel | |||
|
63 | ||||
|
64 | Parameters | |||
|
65 | ---------- | |||
|
66 | ||||
|
67 | data : dict | |||
|
68 | The data to be published. Think of it as a namespace. | |||
|
69 | """ | |||
|
70 | from IPython.zmq.zmqshell import ZMQInteractiveShell | |||
|
71 | ZMQInteractiveShell.instance().data_pub.publish_data(data) |
@@ -270,6 +270,7 b' class InteractiveShell(SingletonConfigurable):' | |||||
270 | display_formatter = Instance(DisplayFormatter) |
|
270 | display_formatter = Instance(DisplayFormatter) | |
271 | displayhook_class = Type(DisplayHook) |
|
271 | displayhook_class = Type(DisplayHook) | |
272 | display_pub_class = Type(DisplayPublisher) |
|
272 | display_pub_class = Type(DisplayPublisher) | |
|
273 | data_pub_class = None | |||
273 |
|
274 | |||
274 | exit_now = CBool(False) |
|
275 | exit_now = CBool(False) | |
275 | exiter = Instance(ExitAutocall) |
|
276 | exiter = Instance(ExitAutocall) | |
@@ -483,6 +484,7 b' class InteractiveShell(SingletonConfigurable):' | |||||
483 | self.init_prompts() |
|
484 | self.init_prompts() | |
484 | self.init_display_formatter() |
|
485 | self.init_display_formatter() | |
485 | self.init_display_pub() |
|
486 | self.init_display_pub() | |
|
487 | self.init_data_pub() | |||
486 | self.init_displayhook() |
|
488 | self.init_displayhook() | |
487 | self.init_reload_doctest() |
|
489 | self.init_reload_doctest() | |
488 | self.init_latextool() |
|
490 | self.init_latextool() | |
@@ -659,6 +661,13 b' class InteractiveShell(SingletonConfigurable):' | |||||
659 | self.display_pub = self.display_pub_class(config=self.config) |
|
661 | self.display_pub = self.display_pub_class(config=self.config) | |
660 | self.configurables.append(self.display_pub) |
|
662 | self.configurables.append(self.display_pub) | |
661 |
|
663 | |||
|
664 | def init_data_pub(self): | |||
|
665 | if not self.data_pub_class: | |||
|
666 | self.data_pub = None | |||
|
667 | return | |||
|
668 | self.data_pub = self.data_pub_class(config=self.config) | |||
|
669 | self.configurables.append(self.data_pub) | |||
|
670 | ||||
662 | def init_displayhook(self): |
|
671 | def init_displayhook(self): | |
663 | # Initialize displayhook, set in/out prompts and printing system |
|
672 | # Initialize displayhook, set in/out prompts and printing system | |
664 | self.displayhook = self.displayhook_class( |
|
673 | self.displayhook = self.displayhook_class( |
@@ -83,7 +83,7 b' class AsyncResult(object):' | |||||
83 | self._tracker = tracker |
|
83 | self._tracker = tracker | |
84 | self._ready = False |
|
84 | self._ready = False | |
85 | self._success = None |
|
85 | self._success = None | |
86 | self._metadata = None |
|
86 | self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ] | |
87 | if len(msg_ids) == 1: |
|
87 | if len(msg_ids) == 1: | |
88 | self._single_result = not isinstance(targets, (list, tuple)) |
|
88 | self._single_result = not isinstance(targets, (list, tuple)) | |
89 | else: |
|
89 | else: | |
@@ -126,6 +126,10 b' class AsyncResult(object):' | |||||
126 | else: |
|
126 | else: | |
127 | raise error.TimeoutError("Result not ready.") |
|
127 | raise error.TimeoutError("Result not ready.") | |
128 |
|
128 | |||
|
129 | def _check_ready(self): | |||
|
130 | if not self.ready(): | |||
|
131 | raise error.TimeoutError("Result not ready.") | |||
|
132 | ||||
129 | def ready(self): |
|
133 | def ready(self): | |
130 | """Return whether the call has completed.""" |
|
134 | """Return whether the call has completed.""" | |
131 | if not self._ready: |
|
135 | if not self._ready: | |
@@ -157,9 +161,8 b' class AsyncResult(object):' | |||||
157 | else: |
|
161 | else: | |
158 | self._success = True |
|
162 | self._success = True | |
159 | finally: |
|
163 | finally: | |
160 | self._metadata = map(self._client.metadata.get, self.msg_ids) |
|
|||
161 | self._wait_for_outputs(10) |
|
|||
162 |
|
164 | |||
|
165 | self._wait_for_outputs(10) | |||
163 |
|
166 | |||
164 |
|
167 | |||
165 | def successful(self): |
|
168 | def successful(self): | |
@@ -192,14 +195,13 b' class AsyncResult(object):' | |||||
192 |
|
195 | |||
193 | @property |
|
196 | @property | |
194 | def result(self): |
|
197 | def result(self): | |
195 |
"""result property wrapper for `get(timeout= |
|
198 | """result property wrapper for `get(timeout=-1)`.""" | |
196 | return self.get() |
|
199 | return self.get() | |
197 |
|
200 | |||
198 | # abbreviated alias: |
|
201 | # abbreviated alias: | |
199 | r = result |
|
202 | r = result | |
200 |
|
203 | |||
201 | @property |
|
204 | @property | |
202 | @check_ready |
|
|||
203 | def metadata(self): |
|
205 | def metadata(self): | |
204 | """property for accessing execution metadata.""" |
|
206 | """property for accessing execution metadata.""" | |
205 | if self._single_result: |
|
207 | if self._single_result: | |
@@ -238,15 +240,17 b' class AsyncResult(object):' | |||||
238 | # dict-access |
|
240 | # dict-access | |
239 | #------------------------------------- |
|
241 | #------------------------------------- | |
240 |
|
242 | |||
241 | @check_ready |
|
|||
242 | def __getitem__(self, key): |
|
243 | def __getitem__(self, key): | |
243 | """getitem returns result value(s) if keyed by int/slice, or metadata if key is str. |
|
244 | """getitem returns result value(s) if keyed by int/slice, or metadata if key is str. | |
244 | """ |
|
245 | """ | |
245 | if isinstance(key, int): |
|
246 | if isinstance(key, int): | |
|
247 | self._check_ready() | |||
246 | return error.collect_exceptions([self._result[key]], self._fname)[0] |
|
248 | return error.collect_exceptions([self._result[key]], self._fname)[0] | |
247 | elif isinstance(key, slice): |
|
249 | elif isinstance(key, slice): | |
|
250 | self._check_ready() | |||
248 | return error.collect_exceptions(self._result[key], self._fname) |
|
251 | return error.collect_exceptions(self._result[key], self._fname) | |
249 | elif isinstance(key, basestring): |
|
252 | elif isinstance(key, basestring): | |
|
253 | # metadata proxy *does not* require that results are done | |||
250 | values = [ md[key] for md in self._metadata ] |
|
254 | values = [ md[key] for md in self._metadata ] | |
251 | if self._single_result: |
|
255 | if self._single_result: | |
252 | return values[0] |
|
256 | return values[0] |
@@ -49,6 +49,7 b' from IPython.parallel import error' | |||||
49 | from IPython.parallel import util |
|
49 | from IPython.parallel import util | |
50 |
|
50 | |||
51 | from IPython.zmq.session import Session, Message |
|
51 | from IPython.zmq.session import Session, Message | |
|
52 | from IPython.zmq import serialize | |||
52 |
|
53 | |||
53 | from .asyncresult import AsyncResult, AsyncHubResult |
|
54 | from .asyncresult import AsyncResult, AsyncHubResult | |
54 | from .view import DirectView, LoadBalancedView |
|
55 | from .view import DirectView, LoadBalancedView | |
@@ -184,6 +185,7 b' class Metadata(dict):' | |||||
184 | 'stdout' : '', |
|
185 | 'stdout' : '', | |
185 | 'stderr' : '', |
|
186 | 'stderr' : '', | |
186 | 'outputs' : [], |
|
187 | 'outputs' : [], | |
|
188 | 'data': {}, | |||
187 | 'outputs_ready' : False, |
|
189 | 'outputs_ready' : False, | |
188 | } |
|
190 | } | |
189 | self.update(md) |
|
191 | self.update(md) | |
@@ -768,7 +770,7 b' class Client(HasTraits):' | |||||
768 |
|
770 | |||
769 | # construct result: |
|
771 | # construct result: | |
770 | if content['status'] == 'ok': |
|
772 | if content['status'] == 'ok': | |
771 |
self.results[msg_id] = |
|
773 | self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0] | |
772 | elif content['status'] == 'aborted': |
|
774 | elif content['status'] == 'aborted': | |
773 | self.results[msg_id] = error.TaskAborted(msg_id) |
|
775 | self.results[msg_id] = error.TaskAborted(msg_id) | |
774 | elif content['status'] == 'resubmitted': |
|
776 | elif content['status'] == 'resubmitted': | |
@@ -864,6 +866,9 b' class Client(HasTraits):' | |||||
864 | md['outputs'].append(content) |
|
866 | md['outputs'].append(content) | |
865 | elif msg_type == 'pyout': |
|
867 | elif msg_type == 'pyout': | |
866 | md['pyout'] = content |
|
868 | md['pyout'] = content | |
|
869 | elif msg_type == 'data_message': | |||
|
870 | data, remainder = serialize.unserialize_object(msg['buffers']) | |||
|
871 | md['data'].update(data) | |||
867 | elif msg_type == 'status': |
|
872 | elif msg_type == 'status': | |
868 | # idle message comes after all outputs |
|
873 | # idle message comes after all outputs | |
869 | if content['execution_state'] == 'idle': |
|
874 | if content['execution_state'] == 'idle': | |
@@ -1209,7 +1214,7 b' class Client(HasTraits):' | |||||
1209 | if not isinstance(metadata, dict): |
|
1214 | if not isinstance(metadata, dict): | |
1210 | raise TypeError("metadata must be dict, not %s"%type(metadata)) |
|
1215 | raise TypeError("metadata must be dict, not %s"%type(metadata)) | |
1211 |
|
1216 | |||
1212 |
bufs = |
|
1217 | bufs = serialize.pack_apply_message(f, args, kwargs, | |
1213 | buffer_threshold=self.session.buffer_threshold, |
|
1218 | buffer_threshold=self.session.buffer_threshold, | |
1214 | item_threshold=self.session.item_threshold, |
|
1219 | item_threshold=self.session.item_threshold, | |
1215 | ) |
|
1220 | ) | |
@@ -1538,7 +1543,7 b' class Client(HasTraits):' | |||||
1538 |
|
1543 | |||
1539 | if rcontent['status'] == 'ok': |
|
1544 | if rcontent['status'] == 'ok': | |
1540 | if header['msg_type'] == 'apply_reply': |
|
1545 | if header['msg_type'] == 'apply_reply': | |
1541 |
res,buffers = |
|
1546 | res,buffers = serialize.unserialize_object(buffers) | |
1542 | elif header['msg_type'] == 'execute_reply': |
|
1547 | elif header['msg_type'] == 'execute_reply': | |
1543 | res = ExecuteReply(msg_id, rcontent, md) |
|
1548 | res = ExecuteReply(msg_id, rcontent, md) | |
1544 | else: |
|
1549 | else: |
@@ -861,6 +861,8 b' class Hub(SessionFactory):' | |||||
861 | d[msg_type] = content |
|
861 | d[msg_type] = content | |
862 | elif msg_type == 'status': |
|
862 | elif msg_type == 'status': | |
863 | pass |
|
863 | pass | |
|
864 | elif msg_type == 'data_pub': | |||
|
865 | self.log.info("ignored data_pub message for %s" % msg_id) | |||
864 | else: |
|
866 | else: | |
865 | self.log.warn("unhandled iopub msg_type: %r", msg_type) |
|
867 | self.log.warn("unhandled iopub msg_type: %r", msg_type) | |
866 |
|
868 |
@@ -81,13 +81,13 b' class AsyncResultTest(ClusterTestCase):' | |||||
81 |
|
81 | |||
82 | def test_getattr(self): |
|
82 | def test_getattr(self): | |
83 | ar = self.client[:].apply_async(wait, 0.5) |
|
83 | ar = self.client[:].apply_async(wait, 0.5) | |
|
84 | self.assertEqual(ar.engine_id, [None] * len(ar)) | |||
84 | self.assertRaises(AttributeError, lambda : ar._foo) |
|
85 | self.assertRaises(AttributeError, lambda : ar._foo) | |
85 | self.assertRaises(AttributeError, lambda : ar.__length_hint__()) |
|
86 | self.assertRaises(AttributeError, lambda : ar.__length_hint__()) | |
86 | self.assertRaises(AttributeError, lambda : ar.foo) |
|
87 | self.assertRaises(AttributeError, lambda : ar.foo) | |
87 | self.assertRaises(AttributeError, lambda : ar.engine_id) |
|
|||
88 | self.assertFalse(hasattr(ar, '__length_hint__')) |
|
88 | self.assertFalse(hasattr(ar, '__length_hint__')) | |
89 | self.assertFalse(hasattr(ar, 'foo')) |
|
89 | self.assertFalse(hasattr(ar, 'foo')) | |
90 |
self.assert |
|
90 | self.assertTrue(hasattr(ar, 'engine_id')) | |
91 | ar.get(5) |
|
91 | ar.get(5) | |
92 | self.assertRaises(AttributeError, lambda : ar._foo) |
|
92 | self.assertRaises(AttributeError, lambda : ar._foo) | |
93 | self.assertRaises(AttributeError, lambda : ar.__length_hint__()) |
|
93 | self.assertRaises(AttributeError, lambda : ar.__length_hint__()) | |
@@ -100,8 +100,8 b' class AsyncResultTest(ClusterTestCase):' | |||||
100 |
|
100 | |||
101 | def test_getitem(self): |
|
101 | def test_getitem(self): | |
102 | ar = self.client[:].apply_async(wait, 0.5) |
|
102 | ar = self.client[:].apply_async(wait, 0.5) | |
103 | self.assertRaises(TimeoutError, lambda : ar['foo']) |
|
103 | self.assertEqual(ar['engine_id'], [None] * len(ar)) | |
104 |
self.assertRaises( |
|
104 | self.assertRaises(KeyError, lambda : ar['foo']) | |
105 | ar.get(5) |
|
105 | ar.get(5) | |
106 | self.assertRaises(KeyError, lambda : ar['foo']) |
|
106 | self.assertRaises(KeyError, lambda : ar['foo']) | |
107 | self.assertTrue(isinstance(ar['engine_id'], list)) |
|
107 | self.assertTrue(isinstance(ar['engine_id'], list)) | |
@@ -109,8 +109,8 b' class AsyncResultTest(ClusterTestCase):' | |||||
109 |
|
109 | |||
110 | def test_single_result(self): |
|
110 | def test_single_result(self): | |
111 | ar = self.client[-1].apply_async(wait, 0.5) |
|
111 | ar = self.client[-1].apply_async(wait, 0.5) | |
112 |
self.assertRaises( |
|
112 | self.assertRaises(KeyError, lambda : ar['foo']) | |
113 |
self.assert |
|
113 | self.assertEqual(ar['engine_id'], None) | |
114 | self.assertTrue(ar.get(5) == 0.5) |
|
114 | self.assertTrue(ar.get(5) == 0.5) | |
115 | self.assertTrue(isinstance(ar['engine_id'], int)) |
|
115 | self.assertTrue(isinstance(ar['engine_id'], int)) | |
116 | self.assertTrue(isinstance(ar.engine_id, int)) |
|
116 | self.assertTrue(isinstance(ar.engine_id, int)) |
@@ -606,4 +606,25 b' class TestView(ClusterTestCase, ParametricTestCase):' | |||||
606 | ar = view.apply_async(bar) |
|
606 | ar = view.apply_async(bar) | |
607 | r = ar.get(10) |
|
607 | r = ar.get(10) | |
608 | self.assertEquals(r, 'foo') |
|
608 | self.assertEquals(r, 'foo') | |
|
609 | def test_data_pub_single(self): | |||
|
610 | view = self.client[-1] | |||
|
611 | ar = view.execute('\n'.join([ | |||
|
612 | 'from IPython.zmq.datapub import publish_data', | |||
|
613 | 'for i in range(5):', | |||
|
614 | ' publish_data(dict(i=i))' | |||
|
615 | ]), block=False) | |||
|
616 | self.assertTrue(isinstance(ar.data, dict)) | |||
|
617 | ar.get(5) | |||
|
618 | self.assertEqual(ar.data, dict(i=4)) | |||
|
619 | ||||
|
620 | def test_data_pub(self): | |||
|
621 | view = self.client[:] | |||
|
622 | ar = view.execute('\n'.join([ | |||
|
623 | 'from IPython.zmq.datapub import publish_data', | |||
|
624 | 'for i in range(5):', | |||
|
625 | ' publish_data(dict(i=i))' | |||
|
626 | ]), block=False) | |||
|
627 | self.assertTrue(all(isinstance(d, dict) for d in ar.data)) | |||
|
628 | ar.get(5) | |||
|
629 | self.assertEqual(ar.data, [dict(i=4)] * len(ar)) | |||
609 |
|
630 |
@@ -147,6 +147,8 b' class Kernel(Configurable):' | |||||
147 | self.shell.displayhook.topic = self._topic('pyout') |
|
147 | self.shell.displayhook.topic = self._topic('pyout') | |
148 | self.shell.display_pub.session = self.session |
|
148 | self.shell.display_pub.session = self.session | |
149 | self.shell.display_pub.pub_socket = self.iopub_socket |
|
149 | self.shell.display_pub.pub_socket = self.iopub_socket | |
|
150 | self.shell.data_pub.session = self.session | |||
|
151 | self.shell.data_pub.pub_socket = self.iopub_socket | |||
150 |
|
152 | |||
151 | # TMP - hack while developing |
|
153 | # TMP - hack while developing | |
152 | self.shell._reply_content = None |
|
154 | self.shell._reply_content = None | |
@@ -353,6 +355,7 b' class Kernel(Configurable):' | |||||
353 | # Set the parent message of the display hook and out streams. |
|
355 | # Set the parent message of the display hook and out streams. | |
354 | shell.displayhook.set_parent(parent) |
|
356 | shell.displayhook.set_parent(parent) | |
355 | shell.display_pub.set_parent(parent) |
|
357 | shell.display_pub.set_parent(parent) | |
|
358 | shell.data_pub.set_parent(parent) | |||
356 | sys.stdout.set_parent(parent) |
|
359 | sys.stdout.set_parent(parent) | |
357 | sys.stderr.set_parent(parent) |
|
360 | sys.stderr.set_parent(parent) | |
358 |
|
361 | |||
@@ -538,6 +541,7 b' class Kernel(Configurable):' | |||||
538 | shell = self.shell |
|
541 | shell = self.shell | |
539 | shell.displayhook.set_parent(parent) |
|
542 | shell.displayhook.set_parent(parent) | |
540 | shell.display_pub.set_parent(parent) |
|
543 | shell.display_pub.set_parent(parent) | |
|
544 | shell.data_pub.set_parent(parent) | |||
541 | sys.stdout.set_parent(parent) |
|
545 | sys.stdout.set_parent(parent) | |
542 | sys.stderr.set_parent(parent) |
|
546 | sys.stderr.set_parent(parent) | |
543 |
|
547 |
@@ -44,6 +44,7 b' from IPython.utils import py3compat' | |||||
44 | from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes |
|
44 | from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes | |
45 | from IPython.utils.warn import warn, error |
|
45 | from IPython.utils.warn import warn, error | |
46 | from IPython.zmq.displayhook import ZMQShellDisplayHook |
|
46 | from IPython.zmq.displayhook import ZMQShellDisplayHook | |
|
47 | from IPython.zmq.datapub import ZMQDataPublisher | |||
47 | from IPython.zmq.session import extract_header |
|
48 | from IPython.zmq.session import extract_header | |
48 | from session import Session |
|
49 | from session import Session | |
49 |
|
50 | |||
@@ -464,6 +465,7 b' class ZMQInteractiveShell(InteractiveShell):' | |||||
464 |
|
465 | |||
465 | displayhook_class = Type(ZMQShellDisplayHook) |
|
466 | displayhook_class = Type(ZMQShellDisplayHook) | |
466 | display_pub_class = Type(ZMQDisplayPublisher) |
|
467 | display_pub_class = Type(ZMQDisplayPublisher) | |
|
468 | data_pub_class = Type(ZMQDataPublisher) | |||
467 |
|
469 | |||
468 | # Override the traitlet in the parent class, because there's no point using |
|
470 | # Override the traitlet in the parent class, because there's no point using | |
469 | # readline for the kernel. Can be removed when the readline code is moved |
|
471 | # readline for the kernel. Can be removed when the readline code is moved |
@@ -763,6 +763,44 b' Message type: ``display_data``::' | |||||
763 | 'metadata' : dict |
|
763 | 'metadata' : dict | |
764 | } |
|
764 | } | |
765 |
|
765 | |||
|
766 | ||||
|
767 | Raw Data Publication | |||
|
768 | -------------------- | |||
|
769 | ||||
|
770 | ``display_data`` lets you publish *representations* of data, such as images and html. | |||
|
771 | This ``data_pub`` message lets you publish *actual raw data*, sent via message buffers. | |||
|
772 | ||||
|
773 | data_pub messages are constructed via the :func:`IPython.lib.datapub.publish_data` function: | |||
|
774 | ||||
|
775 | .. sourcecode:: python | |||
|
776 | ||||
|
777 | from IPython.zmq.datapub import publish_data | |||
|
778 | ns = dict(x=my_array) | |||
|
779 | publish_data(ns) | |||
|
780 | ||||
|
781 | ||||
|
782 | Message type: ``data_pub``:: | |||
|
783 | ||||
|
784 | content = { | |||
|
785 | # the keys of the data dict, after it has been unserialized | |||
|
786 | keys = ['a', 'b'] | |||
|
787 | } | |||
|
788 | # the namespace dict will be serialized in the message buffers, | |||
|
789 | # which will have a length of at least one | |||
|
790 | buffers = ['pdict', ...] | |||
|
791 | ||||
|
792 | ||||
|
793 | The interpretation of a sequence of data_pub messages for a given parent request should be | |||
|
794 | to update a single namespace with subsequent results. | |||
|
795 | ||||
|
796 | .. note:: | |||
|
797 | ||||
|
798 | No frontends directly handle data_pub messages at this time. | |||
|
799 | It is currently only used by the client/engines in :mod:`IPython.parallel`, | |||
|
800 | where engines may publish *data* to the Client, | |||
|
801 | of which the Client can then publish *representations* via ``display_data`` | |||
|
802 | to various frontends. | |||
|
803 | ||||
766 | Python inputs |
|
804 | Python inputs | |
767 | ------------- |
|
805 | ------------- | |
768 |
|
806 |
@@ -30,7 +30,7 b' large blocking requests or database actions in the Hub do not have the ability t' | |||||
30 | job submission and results. |
|
30 | job submission and results. | |
31 |
|
31 | |||
32 | Registration (``ROUTER``) |
|
32 | Registration (``ROUTER``) | |
33 | *********************** |
|
33 | ************************* | |
34 |
|
34 | |||
35 | The first function of the Hub is to facilitate and monitor connections of clients |
|
35 | The first function of the Hub is to facilitate and monitor connections of clients | |
36 | and engines. Both client and engine registration are handled by the same socket, so only |
|
36 | and engines. Both client and engine registration are handled by the same socket, so only | |
@@ -111,7 +111,7 b' Message type : ``unregistration_notification``::' | |||||
111 |
|
111 | |||
112 |
|
112 | |||
113 | Client Queries (``ROUTER``) |
|
113 | Client Queries (``ROUTER``) | |
114 | ************************* |
|
114 | *************************** | |
115 |
|
115 | |||
116 | The hub monitors and logs all queue traffic, so that clients can retrieve past |
|
116 | The hub monitors and logs all queue traffic, so that clients can retrieve past | |
117 | results or monitor pending tasks. This information may reside in-memory on the Hub, or |
|
117 | results or monitor pending tasks. This information may reside in-memory on the Hub, or | |
@@ -242,8 +242,8 b' Message type: ``task_destination``::' | |||||
242 | 'engine_id' : '1234-abcd-...', # the destination engine's zmq.IDENTITY |
|
242 | 'engine_id' : '1234-abcd-...', # the destination engine's zmq.IDENTITY | |
243 | } |
|
243 | } | |
244 |
|
244 | |||
245 | :func:`apply` and :func:`apply_bound` |
|
245 | :func:`apply` | |
246 | ************************************* |
|
246 | ************* | |
247 |
|
247 | |||
248 | In terms of message classes, the MUX scheduler and Task scheduler relay the exact same |
|
248 | In terms of message classes, the MUX scheduler and Task scheduler relay the exact same | |
249 | message types. Their only difference lies in how the destination is selected. |
|
249 | message types. Their only difference lies in how the destination is selected. | |
@@ -260,18 +260,17 b' code string, must be able to send arbitrary (pickleable) Python objects. And ide' | |||||
260 | as little data as we can. The `buffers` property of a Message was introduced for this |
|
260 | as little data as we can. The `buffers` property of a Message was introduced for this | |
261 | purpose. |
|
261 | purpose. | |
262 |
|
262 | |||
263 |
Utility method :func:`build_apply_message` in :mod:`IPython.zmq.s |
|
263 | Utility method :func:`build_apply_message` in :mod:`IPython.zmq.serialize` wraps a | |
264 | function signature and builds a sendable buffer format for minimal data copying (exactly |
|
264 | function signature and builds a sendable buffer format for minimal data copying (exactly | |
265 | zero copies of numpy array data or buffers or large strings). |
|
265 | zero copies of numpy array data or buffers or large strings). | |
266 |
|
266 | |||
267 | Message type: ``apply_request``:: |
|
267 | Message type: ``apply_request``:: | |
268 |
|
268 | |||
269 |
|
|
269 | metadata = { | |
270 | 'bound' : True, # whether to execute in the engine's namespace or unbound |
|
|||
271 | 'after' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict() |
|
270 | 'after' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict() | |
272 | 'follow' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict() |
|
271 | 'follow' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict() | |
273 |
|
||||
274 | } |
|
272 | } | |
|
273 | content = {} | |||
275 | buffers = ['...'] # at least 3 in length |
|
274 | buffers = ['...'] # at least 3 in length | |
276 | # as built by build_apply_message(f,args,kwargs) |
|
275 | # as built by build_apply_message(f,args,kwargs) | |
277 |
|
276 | |||
@@ -360,8 +359,8 b' Split Sends' | |||||
360 | Previously, messages were bundled as a single json object and one call to |
|
359 | Previously, messages were bundled as a single json object and one call to | |
361 | :func:`socket.send_json`. Since the hub inspects all messages, and doesn't need to |
|
360 | :func:`socket.send_json`. Since the hub inspects all messages, and doesn't need to | |
362 | see the content of the messages, which can be large, messages are now serialized and sent in |
|
361 | see the content of the messages, which can be large, messages are now serialized and sent in | |
363 |
pieces. All messages are sent in at least |
|
362 | pieces. All messages are sent in at least 4 parts: the header, the parent header, the metadata and the content. | |
364 |
|
|
363 | This allows the controller to unpack and inspect the (always small) header, | |
365 | without spending time unpacking the content unless the message is bound for the |
|
364 | without spending time unpacking the content unless the message is bound for the | |
366 | controller. Buffers are added on to the end of the message, and can be any objects that |
|
365 | controller. Buffers are added on to the end of the message, and can be any objects that | |
367 | present the buffer interface. |
|
366 | present the buffer interface. |
General Comments 0
You need to be logged in to leave comments.
Login now