##// END OF EJS Templates
Merge pull request #2211 from minrk/datapub...
Min RK -
r8123:dfcd243b merge
parent child Browse files
Show More
@@ -0,0 +1,71 b''
1 """Publishing
2 """
3
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2012 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14
15 from IPython.config import Configurable
16
17 from IPython.utils.jsonutil import json_clean
18 from IPython.utils.traitlets import Instance, Dict, CBytes
19
20 from IPython.zmq.serialize import serialize_object
21 from IPython.zmq.session import Session, extract_header
22
23 #-----------------------------------------------------------------------------
24 # Code
25 #-----------------------------------------------------------------------------
26
27
28 class ZMQDataPublisher(Configurable):
29
30 topic = topic = CBytes(b'datapub')
31 session = Instance(Session)
32 pub_socket = Instance('zmq.Socket')
33 parent_header = Dict({})
34
35 def set_parent(self, parent):
36 """Set the parent for outbound messages."""
37 self.parent_header = extract_header(parent)
38
39 def publish_data(self, data):
40 """publish a data_message on the IOPub channel
41
42 Parameters
43 ----------
44
45 data : dict
46 The data to be published. Think of it as a namespace.
47 """
48 session = self.session
49 buffers = serialize_object(data,
50 buffer_threshold=session.buffer_threshold,
51 item_threshold=session.item_threshold,
52 )
53 content = json_clean(dict(keys=data.keys()))
54 session.send(self.pub_socket, 'data_message', content=content,
55 parent=self.parent_header,
56 buffers=buffers,
57 ident=self.topic,
58 )
59
60
61 def publish_data(data):
62 """publish a data_message on the IOPub channel
63
64 Parameters
65 ----------
66
67 data : dict
68 The data to be published. Think of it as a namespace.
69 """
70 from IPython.zmq.zmqshell import ZMQInteractiveShell
71 ZMQInteractiveShell.instance().data_pub.publish_data(data)
@@ -270,6 +270,7 b' class InteractiveShell(SingletonConfigurable):'
270 270 display_formatter = Instance(DisplayFormatter)
271 271 displayhook_class = Type(DisplayHook)
272 272 display_pub_class = Type(DisplayPublisher)
273 data_pub_class = None
273 274
274 275 exit_now = CBool(False)
275 276 exiter = Instance(ExitAutocall)
@@ -483,6 +484,7 b' class InteractiveShell(SingletonConfigurable):'
483 484 self.init_prompts()
484 485 self.init_display_formatter()
485 486 self.init_display_pub()
487 self.init_data_pub()
486 488 self.init_displayhook()
487 489 self.init_reload_doctest()
488 490 self.init_latextool()
@@ -659,6 +661,13 b' class InteractiveShell(SingletonConfigurable):'
659 661 self.display_pub = self.display_pub_class(config=self.config)
660 662 self.configurables.append(self.display_pub)
661 663
664 def init_data_pub(self):
665 if not self.data_pub_class:
666 self.data_pub = None
667 return
668 self.data_pub = self.data_pub_class(config=self.config)
669 self.configurables.append(self.data_pub)
670
662 671 def init_displayhook(self):
663 672 # Initialize displayhook, set in/out prompts and printing system
664 673 self.displayhook = self.displayhook_class(
@@ -83,7 +83,7 b' class AsyncResult(object):'
83 83 self._tracker = tracker
84 84 self._ready = False
85 85 self._success = None
86 self._metadata = None
86 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
87 87 if len(msg_ids) == 1:
88 88 self._single_result = not isinstance(targets, (list, tuple))
89 89 else:
@@ -126,6 +126,10 b' class AsyncResult(object):'
126 126 else:
127 127 raise error.TimeoutError("Result not ready.")
128 128
129 def _check_ready(self):
130 if not self.ready():
131 raise error.TimeoutError("Result not ready.")
132
129 133 def ready(self):
130 134 """Return whether the call has completed."""
131 135 if not self._ready:
@@ -157,9 +161,8 b' class AsyncResult(object):'
157 161 else:
158 162 self._success = True
159 163 finally:
160 self._metadata = map(self._client.metadata.get, self.msg_ids)
161 self._wait_for_outputs(10)
162 164
165 self._wait_for_outputs(10)
163 166
164 167
165 168 def successful(self):
@@ -192,14 +195,13 b' class AsyncResult(object):'
192 195
193 196 @property
194 197 def result(self):
195 """result property wrapper for `get(timeout=0)`."""
198 """result property wrapper for `get(timeout=-1)`."""
196 199 return self.get()
197 200
198 201 # abbreviated alias:
199 202 r = result
200 203
201 204 @property
202 @check_ready
203 205 def metadata(self):
204 206 """property for accessing execution metadata."""
205 207 if self._single_result:
@@ -238,15 +240,17 b' class AsyncResult(object):'
238 240 # dict-access
239 241 #-------------------------------------
240 242
241 @check_ready
242 243 def __getitem__(self, key):
243 244 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
244 245 """
245 246 if isinstance(key, int):
247 self._check_ready()
246 248 return error.collect_exceptions([self._result[key]], self._fname)[0]
247 249 elif isinstance(key, slice):
250 self._check_ready()
248 251 return error.collect_exceptions(self._result[key], self._fname)
249 252 elif isinstance(key, basestring):
253 # metadata proxy *does not* require that results are done
250 254 values = [ md[key] for md in self._metadata ]
251 255 if self._single_result:
252 256 return values[0]
@@ -49,6 +49,7 b' from IPython.parallel import error'
49 49 from IPython.parallel import util
50 50
51 51 from IPython.zmq.session import Session, Message
52 from IPython.zmq import serialize
52 53
53 54 from .asyncresult import AsyncResult, AsyncHubResult
54 55 from .view import DirectView, LoadBalancedView
@@ -184,6 +185,7 b' class Metadata(dict):'
184 185 'stdout' : '',
185 186 'stderr' : '',
186 187 'outputs' : [],
188 'data': {},
187 189 'outputs_ready' : False,
188 190 }
189 191 self.update(md)
@@ -768,7 +770,7 b' class Client(HasTraits):'
768 770
769 771 # construct result:
770 772 if content['status'] == 'ok':
771 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
773 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
772 774 elif content['status'] == 'aborted':
773 775 self.results[msg_id] = error.TaskAborted(msg_id)
774 776 elif content['status'] == 'resubmitted':
@@ -864,6 +866,9 b' class Client(HasTraits):'
864 866 md['outputs'].append(content)
865 867 elif msg_type == 'pyout':
866 868 md['pyout'] = content
869 elif msg_type == 'data_message':
870 data, remainder = serialize.unserialize_object(msg['buffers'])
871 md['data'].update(data)
867 872 elif msg_type == 'status':
868 873 # idle message comes after all outputs
869 874 if content['execution_state'] == 'idle':
@@ -1209,7 +1214,7 b' class Client(HasTraits):'
1209 1214 if not isinstance(metadata, dict):
1210 1215 raise TypeError("metadata must be dict, not %s"%type(metadata))
1211 1216
1212 bufs = util.pack_apply_message(f, args, kwargs,
1217 bufs = serialize.pack_apply_message(f, args, kwargs,
1213 1218 buffer_threshold=self.session.buffer_threshold,
1214 1219 item_threshold=self.session.item_threshold,
1215 1220 )
@@ -1538,7 +1543,7 b' class Client(HasTraits):'
1538 1543
1539 1544 if rcontent['status'] == 'ok':
1540 1545 if header['msg_type'] == 'apply_reply':
1541 res,buffers = util.unserialize_object(buffers)
1546 res,buffers = serialize.unserialize_object(buffers)
1542 1547 elif header['msg_type'] == 'execute_reply':
1543 1548 res = ExecuteReply(msg_id, rcontent, md)
1544 1549 else:
@@ -861,6 +861,8 b' class Hub(SessionFactory):'
861 861 d[msg_type] = content
862 862 elif msg_type == 'status':
863 863 pass
864 elif msg_type == 'data_pub':
865 self.log.info("ignored data_pub message for %s" % msg_id)
864 866 else:
865 867 self.log.warn("unhandled iopub msg_type: %r", msg_type)
866 868
@@ -81,13 +81,13 b' class AsyncResultTest(ClusterTestCase):'
81 81
82 82 def test_getattr(self):
83 83 ar = self.client[:].apply_async(wait, 0.5)
84 self.assertEqual(ar.engine_id, [None] * len(ar))
84 85 self.assertRaises(AttributeError, lambda : ar._foo)
85 86 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
86 87 self.assertRaises(AttributeError, lambda : ar.foo)
87 self.assertRaises(AttributeError, lambda : ar.engine_id)
88 88 self.assertFalse(hasattr(ar, '__length_hint__'))
89 89 self.assertFalse(hasattr(ar, 'foo'))
90 self.assertFalse(hasattr(ar, 'engine_id'))
90 self.assertTrue(hasattr(ar, 'engine_id'))
91 91 ar.get(5)
92 92 self.assertRaises(AttributeError, lambda : ar._foo)
93 93 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
@@ -100,8 +100,8 b' class AsyncResultTest(ClusterTestCase):'
100 100
101 101 def test_getitem(self):
102 102 ar = self.client[:].apply_async(wait, 0.5)
103 self.assertRaises(TimeoutError, lambda : ar['foo'])
104 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
103 self.assertEqual(ar['engine_id'], [None] * len(ar))
104 self.assertRaises(KeyError, lambda : ar['foo'])
105 105 ar.get(5)
106 106 self.assertRaises(KeyError, lambda : ar['foo'])
107 107 self.assertTrue(isinstance(ar['engine_id'], list))
@@ -109,8 +109,8 b' class AsyncResultTest(ClusterTestCase):'
109 109
110 110 def test_single_result(self):
111 111 ar = self.client[-1].apply_async(wait, 0.5)
112 self.assertRaises(TimeoutError, lambda : ar['foo'])
113 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
112 self.assertRaises(KeyError, lambda : ar['foo'])
113 self.assertEqual(ar['engine_id'], None)
114 114 self.assertTrue(ar.get(5) == 0.5)
115 115 self.assertTrue(isinstance(ar['engine_id'], int))
116 116 self.assertTrue(isinstance(ar.engine_id, int))
@@ -606,4 +606,25 b' class TestView(ClusterTestCase, ParametricTestCase):'
606 606 ar = view.apply_async(bar)
607 607 r = ar.get(10)
608 608 self.assertEquals(r, 'foo')
609 def test_data_pub_single(self):
610 view = self.client[-1]
611 ar = view.execute('\n'.join([
612 'from IPython.zmq.datapub import publish_data',
613 'for i in range(5):',
614 ' publish_data(dict(i=i))'
615 ]), block=False)
616 self.assertTrue(isinstance(ar.data, dict))
617 ar.get(5)
618 self.assertEqual(ar.data, dict(i=4))
619
620 def test_data_pub(self):
621 view = self.client[:]
622 ar = view.execute('\n'.join([
623 'from IPython.zmq.datapub import publish_data',
624 'for i in range(5):',
625 ' publish_data(dict(i=i))'
626 ]), block=False)
627 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
628 ar.get(5)
629 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
609 630
@@ -147,6 +147,8 b' class Kernel(Configurable):'
147 147 self.shell.displayhook.topic = self._topic('pyout')
148 148 self.shell.display_pub.session = self.session
149 149 self.shell.display_pub.pub_socket = self.iopub_socket
150 self.shell.data_pub.session = self.session
151 self.shell.data_pub.pub_socket = self.iopub_socket
150 152
151 153 # TMP - hack while developing
152 154 self.shell._reply_content = None
@@ -353,6 +355,7 b' class Kernel(Configurable):'
353 355 # Set the parent message of the display hook and out streams.
354 356 shell.displayhook.set_parent(parent)
355 357 shell.display_pub.set_parent(parent)
358 shell.data_pub.set_parent(parent)
356 359 sys.stdout.set_parent(parent)
357 360 sys.stderr.set_parent(parent)
358 361
@@ -538,6 +541,7 b' class Kernel(Configurable):'
538 541 shell = self.shell
539 542 shell.displayhook.set_parent(parent)
540 543 shell.display_pub.set_parent(parent)
544 shell.data_pub.set_parent(parent)
541 545 sys.stdout.set_parent(parent)
542 546 sys.stderr.set_parent(parent)
543 547
@@ -44,6 +44,7 b' from IPython.utils import py3compat'
44 44 from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes
45 45 from IPython.utils.warn import warn, error
46 46 from IPython.zmq.displayhook import ZMQShellDisplayHook
47 from IPython.zmq.datapub import ZMQDataPublisher
47 48 from IPython.zmq.session import extract_header
48 49 from session import Session
49 50
@@ -464,6 +465,7 b' class ZMQInteractiveShell(InteractiveShell):'
464 465
465 466 displayhook_class = Type(ZMQShellDisplayHook)
466 467 display_pub_class = Type(ZMQDisplayPublisher)
468 data_pub_class = Type(ZMQDataPublisher)
467 469
468 470 # Override the traitlet in the parent class, because there's no point using
469 471 # readline for the kernel. Can be removed when the readline code is moved
@@ -763,6 +763,44 b' Message type: ``display_data``::'
763 763 'metadata' : dict
764 764 }
765 765
766
767 Raw Data Publication
768 --------------------
769
770 ``display_data`` lets you publish *representations* of data, such as images and html.
771 This ``data_pub`` message lets you publish *actual raw data*, sent via message buffers.
772
773 data_pub messages are constructed via the :func:`IPython.lib.datapub.publish_data` function:
774
775 .. sourcecode:: python
776
777 from IPython.zmq.datapub import publish_data
778 ns = dict(x=my_array)
779 publish_data(ns)
780
781
782 Message type: ``data_pub``::
783
784 content = {
785 # the keys of the data dict, after it has been unserialized
786 keys = ['a', 'b']
787 }
788 # the namespace dict will be serialized in the message buffers,
789 # which will have a length of at least one
790 buffers = ['pdict', ...]
791
792
793 The interpretation of a sequence of data_pub messages for a given parent request should be
794 to update a single namespace with subsequent results.
795
796 .. note::
797
798 No frontends directly handle data_pub messages at this time.
799 It is currently only used by the client/engines in :mod:`IPython.parallel`,
800 where engines may publish *data* to the Client,
801 of which the Client can then publish *representations* via ``display_data``
802 to various frontends.
803
766 804 Python inputs
767 805 -------------
768 806
@@ -30,7 +30,7 b' large blocking requests or database actions in the Hub do not have the ability t'
30 30 job submission and results.
31 31
32 32 Registration (``ROUTER``)
33 ***********************
33 *************************
34 34
35 35 The first function of the Hub is to facilitate and monitor connections of clients
36 36 and engines. Both client and engine registration are handled by the same socket, so only
@@ -111,7 +111,7 b' Message type : ``unregistration_notification``::'
111 111
112 112
113 113 Client Queries (``ROUTER``)
114 *************************
114 ***************************
115 115
116 116 The hub monitors and logs all queue traffic, so that clients can retrieve past
117 117 results or monitor pending tasks. This information may reside in-memory on the Hub, or
@@ -242,8 +242,8 b' Message type: ``task_destination``::'
242 242 'engine_id' : '1234-abcd-...', # the destination engine's zmq.IDENTITY
243 243 }
244 244
245 :func:`apply` and :func:`apply_bound`
246 *************************************
245 :func:`apply`
246 *************
247 247
248 248 In terms of message classes, the MUX scheduler and Task scheduler relay the exact same
249 249 message types. Their only difference lies in how the destination is selected.
@@ -260,18 +260,17 b' code string, must be able to send arbitrary (pickleable) Python objects. And ide'
260 260 as little data as we can. The `buffers` property of a Message was introduced for this
261 261 purpose.
262 262
263 Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a
263 Utility method :func:`build_apply_message` in :mod:`IPython.zmq.serialize` wraps a
264 264 function signature and builds a sendable buffer format for minimal data copying (exactly
265 265 zero copies of numpy array data or buffers or large strings).
266 266
267 267 Message type: ``apply_request``::
268 268
269 content = {
270 'bound' : True, # whether to execute in the engine's namespace or unbound
269 metadata = {
271 270 'after' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict()
272 271 'follow' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict()
273
274 272 }
273 content = {}
275 274 buffers = ['...'] # at least 3 in length
276 275 # as built by build_apply_message(f,args,kwargs)
277 276
@@ -360,8 +359,8 b' Split Sends'
360 359 Previously, messages were bundled as a single json object and one call to
361 360 :func:`socket.send_json`. Since the hub inspects all messages, and doesn't need to
362 361 see the content of the messages, which can be large, messages are now serialized and sent in
363 pieces. All messages are sent in at least 3 parts: the header, the parent header, and the
364 content. This allows the controller to unpack and inspect the (always small) header,
362 pieces. All messages are sent in at least 4 parts: the header, the parent header, the metadata and the content.
363 This allows the controller to unpack and inspect the (always small) header,
365 364 without spending time unpacking the content unless the message is bound for the
366 365 controller. Buffers are added on to the end of the message, and can be any objects that
367 366 present the buffer interface.
General Comments 0
You need to be logged in to leave comments. Login now