Show More
@@ -0,0 +1,71 b'' | |||
|
1 | """Publishing | |
|
2 | """ | |
|
3 | ||
|
4 | #----------------------------------------------------------------------------- | |
|
5 | # Copyright (C) 2012 The IPython Development Team | |
|
6 | # | |
|
7 | # Distributed under the terms of the BSD License. The full license is in | |
|
8 | # the file COPYING, distributed as part of this software. | |
|
9 | #----------------------------------------------------------------------------- | |
|
10 | ||
|
11 | #----------------------------------------------------------------------------- | |
|
12 | # Imports | |
|
13 | #----------------------------------------------------------------------------- | |
|
14 | ||
|
15 | from IPython.config import Configurable | |
|
16 | ||
|
17 | from IPython.utils.jsonutil import json_clean | |
|
18 | from IPython.utils.traitlets import Instance, Dict, CBytes | |
|
19 | ||
|
20 | from IPython.zmq.serialize import serialize_object | |
|
21 | from IPython.zmq.session import Session, extract_header | |
|
22 | ||
|
23 | #----------------------------------------------------------------------------- | |
|
24 | # Code | |
|
25 | #----------------------------------------------------------------------------- | |
|
26 | ||
|
27 | ||
|
28 | class ZMQDataPublisher(Configurable): | |
|
29 | ||
|
30 | topic = topic = CBytes(b'datapub') | |
|
31 | session = Instance(Session) | |
|
32 | pub_socket = Instance('zmq.Socket') | |
|
33 | parent_header = Dict({}) | |
|
34 | ||
|
35 | def set_parent(self, parent): | |
|
36 | """Set the parent for outbound messages.""" | |
|
37 | self.parent_header = extract_header(parent) | |
|
38 | ||
|
39 | def publish_data(self, data): | |
|
40 | """publish a data_message on the IOPub channel | |
|
41 | ||
|
42 | Parameters | |
|
43 | ---------- | |
|
44 | ||
|
45 | data : dict | |
|
46 | The data to be published. Think of it as a namespace. | |
|
47 | """ | |
|
48 | session = self.session | |
|
49 | buffers = serialize_object(data, | |
|
50 | buffer_threshold=session.buffer_threshold, | |
|
51 | item_threshold=session.item_threshold, | |
|
52 | ) | |
|
53 | content = json_clean(dict(keys=data.keys())) | |
|
54 | session.send(self.pub_socket, 'data_message', content=content, | |
|
55 | parent=self.parent_header, | |
|
56 | buffers=buffers, | |
|
57 | ident=self.topic, | |
|
58 | ) | |
|
59 | ||
|
60 | ||
|
61 | def publish_data(data): | |
|
62 | """publish a data_message on the IOPub channel | |
|
63 | ||
|
64 | Parameters | |
|
65 | ---------- | |
|
66 | ||
|
67 | data : dict | |
|
68 | The data to be published. Think of it as a namespace. | |
|
69 | """ | |
|
70 | from IPython.zmq.zmqshell import ZMQInteractiveShell | |
|
71 | ZMQInteractiveShell.instance().data_pub.publish_data(data) |
@@ -270,6 +270,7 b' class InteractiveShell(SingletonConfigurable):' | |||
|
270 | 270 | display_formatter = Instance(DisplayFormatter) |
|
271 | 271 | displayhook_class = Type(DisplayHook) |
|
272 | 272 | display_pub_class = Type(DisplayPublisher) |
|
273 | data_pub_class = None | |
|
273 | 274 | |
|
274 | 275 | exit_now = CBool(False) |
|
275 | 276 | exiter = Instance(ExitAutocall) |
@@ -483,6 +484,7 b' class InteractiveShell(SingletonConfigurable):' | |||
|
483 | 484 | self.init_prompts() |
|
484 | 485 | self.init_display_formatter() |
|
485 | 486 | self.init_display_pub() |
|
487 | self.init_data_pub() | |
|
486 | 488 | self.init_displayhook() |
|
487 | 489 | self.init_reload_doctest() |
|
488 | 490 | self.init_latextool() |
@@ -659,6 +661,13 b' class InteractiveShell(SingletonConfigurable):' | |||
|
659 | 661 | self.display_pub = self.display_pub_class(config=self.config) |
|
660 | 662 | self.configurables.append(self.display_pub) |
|
661 | 663 | |
|
664 | def init_data_pub(self): | |
|
665 | if not self.data_pub_class: | |
|
666 | self.data_pub = None | |
|
667 | return | |
|
668 | self.data_pub = self.data_pub_class(config=self.config) | |
|
669 | self.configurables.append(self.data_pub) | |
|
670 | ||
|
662 | 671 | def init_displayhook(self): |
|
663 | 672 | # Initialize displayhook, set in/out prompts and printing system |
|
664 | 673 | self.displayhook = self.displayhook_class( |
@@ -83,7 +83,7 b' class AsyncResult(object):' | |||
|
83 | 83 | self._tracker = tracker |
|
84 | 84 | self._ready = False |
|
85 | 85 | self._success = None |
|
86 | self._metadata = None | |
|
86 | self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ] | |
|
87 | 87 | if len(msg_ids) == 1: |
|
88 | 88 | self._single_result = not isinstance(targets, (list, tuple)) |
|
89 | 89 | else: |
@@ -126,6 +126,10 b' class AsyncResult(object):' | |||
|
126 | 126 | else: |
|
127 | 127 | raise error.TimeoutError("Result not ready.") |
|
128 | 128 | |
|
129 | def _check_ready(self): | |
|
130 | if not self.ready(): | |
|
131 | raise error.TimeoutError("Result not ready.") | |
|
132 | ||
|
129 | 133 | def ready(self): |
|
130 | 134 | """Return whether the call has completed.""" |
|
131 | 135 | if not self._ready: |
@@ -157,9 +161,8 b' class AsyncResult(object):' | |||
|
157 | 161 | else: |
|
158 | 162 | self._success = True |
|
159 | 163 | finally: |
|
160 | self._metadata = map(self._client.metadata.get, self.msg_ids) | |
|
161 | self._wait_for_outputs(10) | |
|
162 | 164 | |
|
165 | self._wait_for_outputs(10) | |
|
163 | 166 | |
|
164 | 167 | |
|
165 | 168 | def successful(self): |
@@ -192,14 +195,13 b' class AsyncResult(object):' | |||
|
192 | 195 | |
|
193 | 196 | @property |
|
194 | 197 | def result(self): |
|
195 |
"""result property wrapper for `get(timeout= |
|
|
198 | """result property wrapper for `get(timeout=-1)`.""" | |
|
196 | 199 | return self.get() |
|
197 | 200 | |
|
198 | 201 | # abbreviated alias: |
|
199 | 202 | r = result |
|
200 | 203 | |
|
201 | 204 | @property |
|
202 | @check_ready | |
|
203 | 205 | def metadata(self): |
|
204 | 206 | """property for accessing execution metadata.""" |
|
205 | 207 | if self._single_result: |
@@ -238,15 +240,17 b' class AsyncResult(object):' | |||
|
238 | 240 | # dict-access |
|
239 | 241 | #------------------------------------- |
|
240 | 242 | |
|
241 | @check_ready | |
|
242 | 243 | def __getitem__(self, key): |
|
243 | 244 | """getitem returns result value(s) if keyed by int/slice, or metadata if key is str. |
|
244 | 245 | """ |
|
245 | 246 | if isinstance(key, int): |
|
247 | self._check_ready() | |
|
246 | 248 | return error.collect_exceptions([self._result[key]], self._fname)[0] |
|
247 | 249 | elif isinstance(key, slice): |
|
250 | self._check_ready() | |
|
248 | 251 | return error.collect_exceptions(self._result[key], self._fname) |
|
249 | 252 | elif isinstance(key, basestring): |
|
253 | # metadata proxy *does not* require that results are done | |
|
250 | 254 | values = [ md[key] for md in self._metadata ] |
|
251 | 255 | if self._single_result: |
|
252 | 256 | return values[0] |
@@ -49,6 +49,7 b' from IPython.parallel import error' | |||
|
49 | 49 | from IPython.parallel import util |
|
50 | 50 | |
|
51 | 51 | from IPython.zmq.session import Session, Message |
|
52 | from IPython.zmq import serialize | |
|
52 | 53 | |
|
53 | 54 | from .asyncresult import AsyncResult, AsyncHubResult |
|
54 | 55 | from .view import DirectView, LoadBalancedView |
@@ -184,6 +185,7 b' class Metadata(dict):' | |||
|
184 | 185 | 'stdout' : '', |
|
185 | 186 | 'stderr' : '', |
|
186 | 187 | 'outputs' : [], |
|
188 | 'data': {}, | |
|
187 | 189 | 'outputs_ready' : False, |
|
188 | 190 | } |
|
189 | 191 | self.update(md) |
@@ -768,7 +770,7 b' class Client(HasTraits):' | |||
|
768 | 770 | |
|
769 | 771 | # construct result: |
|
770 | 772 | if content['status'] == 'ok': |
|
771 |
self.results[msg_id] = |
|
|
773 | self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0] | |
|
772 | 774 | elif content['status'] == 'aborted': |
|
773 | 775 | self.results[msg_id] = error.TaskAborted(msg_id) |
|
774 | 776 | elif content['status'] == 'resubmitted': |
@@ -864,6 +866,9 b' class Client(HasTraits):' | |||
|
864 | 866 | md['outputs'].append(content) |
|
865 | 867 | elif msg_type == 'pyout': |
|
866 | 868 | md['pyout'] = content |
|
869 | elif msg_type == 'data_message': | |
|
870 | data, remainder = serialize.unserialize_object(msg['buffers']) | |
|
871 | md['data'].update(data) | |
|
867 | 872 | elif msg_type == 'status': |
|
868 | 873 | # idle message comes after all outputs |
|
869 | 874 | if content['execution_state'] == 'idle': |
@@ -1209,7 +1214,7 b' class Client(HasTraits):' | |||
|
1209 | 1214 | if not isinstance(metadata, dict): |
|
1210 | 1215 | raise TypeError("metadata must be dict, not %s"%type(metadata)) |
|
1211 | 1216 | |
|
1212 |
bufs = |
|
|
1217 | bufs = serialize.pack_apply_message(f, args, kwargs, | |
|
1213 | 1218 | buffer_threshold=self.session.buffer_threshold, |
|
1214 | 1219 | item_threshold=self.session.item_threshold, |
|
1215 | 1220 | ) |
@@ -1538,7 +1543,7 b' class Client(HasTraits):' | |||
|
1538 | 1543 | |
|
1539 | 1544 | if rcontent['status'] == 'ok': |
|
1540 | 1545 | if header['msg_type'] == 'apply_reply': |
|
1541 |
res,buffers = |
|
|
1546 | res,buffers = serialize.unserialize_object(buffers) | |
|
1542 | 1547 | elif header['msg_type'] == 'execute_reply': |
|
1543 | 1548 | res = ExecuteReply(msg_id, rcontent, md) |
|
1544 | 1549 | else: |
@@ -861,6 +861,8 b' class Hub(SessionFactory):' | |||
|
861 | 861 | d[msg_type] = content |
|
862 | 862 | elif msg_type == 'status': |
|
863 | 863 | pass |
|
864 | elif msg_type == 'data_pub': | |
|
865 | self.log.info("ignored data_pub message for %s" % msg_id) | |
|
864 | 866 | else: |
|
865 | 867 | self.log.warn("unhandled iopub msg_type: %r", msg_type) |
|
866 | 868 |
@@ -81,13 +81,13 b' class AsyncResultTest(ClusterTestCase):' | |||
|
81 | 81 | |
|
82 | 82 | def test_getattr(self): |
|
83 | 83 | ar = self.client[:].apply_async(wait, 0.5) |
|
84 | self.assertEqual(ar.engine_id, [None] * len(ar)) | |
|
84 | 85 | self.assertRaises(AttributeError, lambda : ar._foo) |
|
85 | 86 | self.assertRaises(AttributeError, lambda : ar.__length_hint__()) |
|
86 | 87 | self.assertRaises(AttributeError, lambda : ar.foo) |
|
87 | self.assertRaises(AttributeError, lambda : ar.engine_id) | |
|
88 | 88 | self.assertFalse(hasattr(ar, '__length_hint__')) |
|
89 | 89 | self.assertFalse(hasattr(ar, 'foo')) |
|
90 |
self.assert |
|
|
90 | self.assertTrue(hasattr(ar, 'engine_id')) | |
|
91 | 91 | ar.get(5) |
|
92 | 92 | self.assertRaises(AttributeError, lambda : ar._foo) |
|
93 | 93 | self.assertRaises(AttributeError, lambda : ar.__length_hint__()) |
@@ -100,8 +100,8 b' class AsyncResultTest(ClusterTestCase):' | |||
|
100 | 100 | |
|
101 | 101 | def test_getitem(self): |
|
102 | 102 | ar = self.client[:].apply_async(wait, 0.5) |
|
103 | self.assertRaises(TimeoutError, lambda : ar['foo']) | |
|
104 |
self.assertRaises( |
|
|
103 | self.assertEqual(ar['engine_id'], [None] * len(ar)) | |
|
104 | self.assertRaises(KeyError, lambda : ar['foo']) | |
|
105 | 105 | ar.get(5) |
|
106 | 106 | self.assertRaises(KeyError, lambda : ar['foo']) |
|
107 | 107 | self.assertTrue(isinstance(ar['engine_id'], list)) |
@@ -109,8 +109,8 b' class AsyncResultTest(ClusterTestCase):' | |||
|
109 | 109 | |
|
110 | 110 | def test_single_result(self): |
|
111 | 111 | ar = self.client[-1].apply_async(wait, 0.5) |
|
112 |
self.assertRaises( |
|
|
113 |
self.assert |
|
|
112 | self.assertRaises(KeyError, lambda : ar['foo']) | |
|
113 | self.assertEqual(ar['engine_id'], None) | |
|
114 | 114 | self.assertTrue(ar.get(5) == 0.5) |
|
115 | 115 | self.assertTrue(isinstance(ar['engine_id'], int)) |
|
116 | 116 | self.assertTrue(isinstance(ar.engine_id, int)) |
@@ -606,4 +606,25 b' class TestView(ClusterTestCase, ParametricTestCase):' | |||
|
606 | 606 | ar = view.apply_async(bar) |
|
607 | 607 | r = ar.get(10) |
|
608 | 608 | self.assertEquals(r, 'foo') |
|
609 | def test_data_pub_single(self): | |
|
610 | view = self.client[-1] | |
|
611 | ar = view.execute('\n'.join([ | |
|
612 | 'from IPython.zmq.datapub import publish_data', | |
|
613 | 'for i in range(5):', | |
|
614 | ' publish_data(dict(i=i))' | |
|
615 | ]), block=False) | |
|
616 | self.assertTrue(isinstance(ar.data, dict)) | |
|
617 | ar.get(5) | |
|
618 | self.assertEqual(ar.data, dict(i=4)) | |
|
619 | ||
|
620 | def test_data_pub(self): | |
|
621 | view = self.client[:] | |
|
622 | ar = view.execute('\n'.join([ | |
|
623 | 'from IPython.zmq.datapub import publish_data', | |
|
624 | 'for i in range(5):', | |
|
625 | ' publish_data(dict(i=i))' | |
|
626 | ]), block=False) | |
|
627 | self.assertTrue(all(isinstance(d, dict) for d in ar.data)) | |
|
628 | ar.get(5) | |
|
629 | self.assertEqual(ar.data, [dict(i=4)] * len(ar)) | |
|
609 | 630 |
@@ -147,6 +147,8 b' class Kernel(Configurable):' | |||
|
147 | 147 | self.shell.displayhook.topic = self._topic('pyout') |
|
148 | 148 | self.shell.display_pub.session = self.session |
|
149 | 149 | self.shell.display_pub.pub_socket = self.iopub_socket |
|
150 | self.shell.data_pub.session = self.session | |
|
151 | self.shell.data_pub.pub_socket = self.iopub_socket | |
|
150 | 152 | |
|
151 | 153 | # TMP - hack while developing |
|
152 | 154 | self.shell._reply_content = None |
@@ -353,6 +355,7 b' class Kernel(Configurable):' | |||
|
353 | 355 | # Set the parent message of the display hook and out streams. |
|
354 | 356 | shell.displayhook.set_parent(parent) |
|
355 | 357 | shell.display_pub.set_parent(parent) |
|
358 | shell.data_pub.set_parent(parent) | |
|
356 | 359 | sys.stdout.set_parent(parent) |
|
357 | 360 | sys.stderr.set_parent(parent) |
|
358 | 361 | |
@@ -538,6 +541,7 b' class Kernel(Configurable):' | |||
|
538 | 541 | shell = self.shell |
|
539 | 542 | shell.displayhook.set_parent(parent) |
|
540 | 543 | shell.display_pub.set_parent(parent) |
|
544 | shell.data_pub.set_parent(parent) | |
|
541 | 545 | sys.stdout.set_parent(parent) |
|
542 | 546 | sys.stderr.set_parent(parent) |
|
543 | 547 |
@@ -44,6 +44,7 b' from IPython.utils import py3compat' | |||
|
44 | 44 | from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes |
|
45 | 45 | from IPython.utils.warn import warn, error |
|
46 | 46 | from IPython.zmq.displayhook import ZMQShellDisplayHook |
|
47 | from IPython.zmq.datapub import ZMQDataPublisher | |
|
47 | 48 | from IPython.zmq.session import extract_header |
|
48 | 49 | from session import Session |
|
49 | 50 | |
@@ -464,6 +465,7 b' class ZMQInteractiveShell(InteractiveShell):' | |||
|
464 | 465 | |
|
465 | 466 | displayhook_class = Type(ZMQShellDisplayHook) |
|
466 | 467 | display_pub_class = Type(ZMQDisplayPublisher) |
|
468 | data_pub_class = Type(ZMQDataPublisher) | |
|
467 | 469 | |
|
468 | 470 | # Override the traitlet in the parent class, because there's no point using |
|
469 | 471 | # readline for the kernel. Can be removed when the readline code is moved |
@@ -763,6 +763,44 b' Message type: ``display_data``::' | |||
|
763 | 763 | 'metadata' : dict |
|
764 | 764 | } |
|
765 | 765 | |
|
766 | ||
|
767 | Raw Data Publication | |
|
768 | -------------------- | |
|
769 | ||
|
770 | ``display_data`` lets you publish *representations* of data, such as images and html. | |
|
771 | This ``data_pub`` message lets you publish *actual raw data*, sent via message buffers. | |
|
772 | ||
|
773 | data_pub messages are constructed via the :func:`IPython.lib.datapub.publish_data` function: | |
|
774 | ||
|
775 | .. sourcecode:: python | |
|
776 | ||
|
777 | from IPython.zmq.datapub import publish_data | |
|
778 | ns = dict(x=my_array) | |
|
779 | publish_data(ns) | |
|
780 | ||
|
781 | ||
|
782 | Message type: ``data_pub``:: | |
|
783 | ||
|
784 | content = { | |
|
785 | # the keys of the data dict, after it has been unserialized | |
|
786 | keys = ['a', 'b'] | |
|
787 | } | |
|
788 | # the namespace dict will be serialized in the message buffers, | |
|
789 | # which will have a length of at least one | |
|
790 | buffers = ['pdict', ...] | |
|
791 | ||
|
792 | ||
|
793 | The interpretation of a sequence of data_pub messages for a given parent request should be | |
|
794 | to update a single namespace with subsequent results. | |
|
795 | ||
|
796 | .. note:: | |
|
797 | ||
|
798 | No frontends directly handle data_pub messages at this time. | |
|
799 | It is currently only used by the client/engines in :mod:`IPython.parallel`, | |
|
800 | where engines may publish *data* to the Client, | |
|
801 | of which the Client can then publish *representations* via ``display_data`` | |
|
802 | to various frontends. | |
|
803 | ||
|
766 | 804 | Python inputs |
|
767 | 805 | ------------- |
|
768 | 806 |
@@ -30,7 +30,7 b' large blocking requests or database actions in the Hub do not have the ability t' | |||
|
30 | 30 | job submission and results. |
|
31 | 31 | |
|
32 | 32 | Registration (``ROUTER``) |
|
33 | *********************** | |
|
33 | ************************* | |
|
34 | 34 | |
|
35 | 35 | The first function of the Hub is to facilitate and monitor connections of clients |
|
36 | 36 | and engines. Both client and engine registration are handled by the same socket, so only |
@@ -111,7 +111,7 b' Message type : ``unregistration_notification``::' | |||
|
111 | 111 | |
|
112 | 112 | |
|
113 | 113 | Client Queries (``ROUTER``) |
|
114 | ************************* | |
|
114 | *************************** | |
|
115 | 115 | |
|
116 | 116 | The hub monitors and logs all queue traffic, so that clients can retrieve past |
|
117 | 117 | results or monitor pending tasks. This information may reside in-memory on the Hub, or |
@@ -242,8 +242,8 b' Message type: ``task_destination``::' | |||
|
242 | 242 | 'engine_id' : '1234-abcd-...', # the destination engine's zmq.IDENTITY |
|
243 | 243 | } |
|
244 | 244 | |
|
245 | :func:`apply` and :func:`apply_bound` | |
|
246 | ************************************* | |
|
245 | :func:`apply` | |
|
246 | ************* | |
|
247 | 247 | |
|
248 | 248 | In terms of message classes, the MUX scheduler and Task scheduler relay the exact same |
|
249 | 249 | message types. Their only difference lies in how the destination is selected. |
@@ -260,18 +260,17 b' code string, must be able to send arbitrary (pickleable) Python objects. And ide' | |||
|
260 | 260 | as little data as we can. The `buffers` property of a Message was introduced for this |
|
261 | 261 | purpose. |
|
262 | 262 | |
|
263 |
Utility method :func:`build_apply_message` in :mod:`IPython.zmq.s |
|
|
263 | Utility method :func:`build_apply_message` in :mod:`IPython.zmq.serialize` wraps a | |
|
264 | 264 | function signature and builds a sendable buffer format for minimal data copying (exactly |
|
265 | 265 | zero copies of numpy array data or buffers or large strings). |
|
266 | 266 | |
|
267 | 267 | Message type: ``apply_request``:: |
|
268 | 268 | |
|
269 |
|
|
|
270 | 'bound' : True, # whether to execute in the engine's namespace or unbound | |
|
269 | metadata = { | |
|
271 | 270 | 'after' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict() |
|
272 | 271 | 'follow' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict() |
|
273 | ||
|
274 | 272 | } |
|
273 | content = {} | |
|
275 | 274 | buffers = ['...'] # at least 3 in length |
|
276 | 275 | # as built by build_apply_message(f,args,kwargs) |
|
277 | 276 | |
@@ -360,8 +359,8 b' Split Sends' | |||
|
360 | 359 | Previously, messages were bundled as a single json object and one call to |
|
361 | 360 | :func:`socket.send_json`. Since the hub inspects all messages, and doesn't need to |
|
362 | 361 | see the content of the messages, which can be large, messages are now serialized and sent in |
|
363 |
pieces. All messages are sent in at least |
|
|
364 |
|
|
|
362 | pieces. All messages are sent in at least 4 parts: the header, the parent header, the metadata and the content. | |
|
363 | This allows the controller to unpack and inspect the (always small) header, | |
|
365 | 364 | without spending time unpacking the content unless the message is bound for the |
|
366 | 365 | controller. Buffers are added on to the end of the message, and can be any objects that |
|
367 | 366 | present the buffer interface. |
General Comments 0
You need to be logged in to leave comments.
Login now