From dfcd243bd2e8cf7a0bb1e4884e1c2c0b9cb92013 2012-08-02 17:30:21 From: Min RK Date: 2012-08-02 17:30:21 Subject: [PATCH] Merge pull request #2211 from minrk/datapub add data publication message Functions just like displaypub, but sends a namespace of actual data instead of representations. This uses the serialization/zero-copy machinery of the parallel code. The current interpretation of a sequence of data publications within a cell is updates of a single namespace. That is, a series of calls to publish_data(dict(A=...)) during a given cell will result in a single dict with the latest value of A, updated in-place. An alternate interpretation could be to keep appending to a list, but I expect the current update approach to be preferable. Changes along the way: AsyncResults no longer protect metadata access while results are pending. This was an artificial limitation, and impedes informed access of incomplete metadata, which actually works just fine. --- diff --git a/IPython/core/interactiveshell.py b/IPython/core/interactiveshell.py index 4a24fe3..d0d4796 100644 --- a/IPython/core/interactiveshell.py +++ b/IPython/core/interactiveshell.py @@ -270,6 +270,7 @@ class InteractiveShell(SingletonConfigurable): display_formatter = Instance(DisplayFormatter) displayhook_class = Type(DisplayHook) display_pub_class = Type(DisplayPublisher) + data_pub_class = None exit_now = CBool(False) exiter = Instance(ExitAutocall) @@ -483,6 +484,7 @@ class InteractiveShell(SingletonConfigurable): self.init_prompts() self.init_display_formatter() self.init_display_pub() + self.init_data_pub() self.init_displayhook() self.init_reload_doctest() self.init_latextool() @@ -659,6 +661,13 @@ class InteractiveShell(SingletonConfigurable): self.display_pub = self.display_pub_class(config=self.config) self.configurables.append(self.display_pub) + def init_data_pub(self): + if not self.data_pub_class: + self.data_pub = None + return + self.data_pub = self.data_pub_class(config=self.config) + self.configurables.append(self.data_pub) + def init_displayhook(self): # Initialize displayhook, set in/out prompts and printing system self.displayhook = self.displayhook_class( diff --git a/IPython/parallel/client/asyncresult.py b/IPython/parallel/client/asyncresult.py index ec4f0d5..338cbc6 100644 --- a/IPython/parallel/client/asyncresult.py +++ b/IPython/parallel/client/asyncresult.py @@ -83,7 +83,7 @@ class AsyncResult(object): self._tracker = tracker self._ready = False self._success = None - self._metadata = None + self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ] if len(msg_ids) == 1: self._single_result = not isinstance(targets, (list, tuple)) else: @@ -126,6 +126,10 @@ class AsyncResult(object): else: raise error.TimeoutError("Result not ready.") + def _check_ready(self): + if not self.ready(): + raise error.TimeoutError("Result not ready.") + def ready(self): """Return whether the call has completed.""" if not self._ready: @@ -157,9 +161,8 @@ class AsyncResult(object): else: self._success = True finally: - self._metadata = map(self._client.metadata.get, self.msg_ids) + self._wait_for_outputs(10) - def successful(self): @@ -192,14 +195,13 @@ class AsyncResult(object): @property def result(self): - """result property wrapper for `get(timeout=0)`.""" + """result property wrapper for `get(timeout=-1)`.""" return self.get() # abbreviated alias: r = result @property - @check_ready def metadata(self): """property for accessing execution metadata.""" if self._single_result: @@ -238,15 +240,17 @@ class AsyncResult(object): # dict-access #------------------------------------- - @check_ready def __getitem__(self, key): """getitem returns result value(s) if keyed by int/slice, or metadata if key is str. """ if isinstance(key, int): + self._check_ready() return error.collect_exceptions([self._result[key]], self._fname)[0] elif isinstance(key, slice): + self._check_ready() return error.collect_exceptions(self._result[key], self._fname) elif isinstance(key, basestring): + # metadata proxy *does not* require that results are done values = [ md[key] for md in self._metadata ] if self._single_result: return values[0] diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 79d54a4..4571023 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -49,6 +49,7 @@ from IPython.parallel import error from IPython.parallel import util from IPython.zmq.session import Session, Message +from IPython.zmq import serialize from .asyncresult import AsyncResult, AsyncHubResult from .view import DirectView, LoadBalancedView @@ -184,6 +185,7 @@ class Metadata(dict): 'stdout' : '', 'stderr' : '', 'outputs' : [], + 'data': {}, 'outputs_ready' : False, } self.update(md) @@ -768,7 +770,7 @@ class Client(HasTraits): # construct result: if content['status'] == 'ok': - self.results[msg_id] = util.unserialize_object(msg['buffers'])[0] + self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0] elif content['status'] == 'aborted': self.results[msg_id] = error.TaskAborted(msg_id) elif content['status'] == 'resubmitted': @@ -864,6 +866,9 @@ class Client(HasTraits): md['outputs'].append(content) elif msg_type == 'pyout': md['pyout'] = content + elif msg_type == 'data_message': + data, remainder = serialize.unserialize_object(msg['buffers']) + md['data'].update(data) elif msg_type == 'status': # idle message comes after all outputs if content['execution_state'] == 'idle': @@ -1209,7 +1214,7 @@ class Client(HasTraits): if not isinstance(metadata, dict): raise TypeError("metadata must be dict, not %s"%type(metadata)) - bufs = util.pack_apply_message(f, args, kwargs, + bufs = serialize.pack_apply_message(f, args, kwargs, buffer_threshold=self.session.buffer_threshold, item_threshold=self.session.item_threshold, ) @@ -1538,7 +1543,7 @@ class Client(HasTraits): if rcontent['status'] == 'ok': if header['msg_type'] == 'apply_reply': - res,buffers = util.unserialize_object(buffers) + res,buffers = serialize.unserialize_object(buffers) elif header['msg_type'] == 'execute_reply': res = ExecuteReply(msg_id, rcontent, md) else: diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 5782a5e..54eea28 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -861,6 +861,8 @@ class Hub(SessionFactory): d[msg_type] = content elif msg_type == 'status': pass + elif msg_type == 'data_pub': + self.log.info("ignored data_pub message for %s" % msg_id) else: self.log.warn("unhandled iopub msg_type: %r", msg_type) diff --git a/IPython/parallel/tests/test_asyncresult.py b/IPython/parallel/tests/test_asyncresult.py index e811629..9ffeeef 100644 --- a/IPython/parallel/tests/test_asyncresult.py +++ b/IPython/parallel/tests/test_asyncresult.py @@ -81,13 +81,13 @@ class AsyncResultTest(ClusterTestCase): def test_getattr(self): ar = self.client[:].apply_async(wait, 0.5) + self.assertEqual(ar.engine_id, [None] * len(ar)) self.assertRaises(AttributeError, lambda : ar._foo) self.assertRaises(AttributeError, lambda : ar.__length_hint__()) self.assertRaises(AttributeError, lambda : ar.foo) - self.assertRaises(AttributeError, lambda : ar.engine_id) self.assertFalse(hasattr(ar, '__length_hint__')) self.assertFalse(hasattr(ar, 'foo')) - self.assertFalse(hasattr(ar, 'engine_id')) + self.assertTrue(hasattr(ar, 'engine_id')) ar.get(5) self.assertRaises(AttributeError, lambda : ar._foo) self.assertRaises(AttributeError, lambda : ar.__length_hint__()) @@ -100,8 +100,8 @@ class AsyncResultTest(ClusterTestCase): def test_getitem(self): ar = self.client[:].apply_async(wait, 0.5) - self.assertRaises(TimeoutError, lambda : ar['foo']) - self.assertRaises(TimeoutError, lambda : ar['engine_id']) + self.assertEqual(ar['engine_id'], [None] * len(ar)) + self.assertRaises(KeyError, lambda : ar['foo']) ar.get(5) self.assertRaises(KeyError, lambda : ar['foo']) self.assertTrue(isinstance(ar['engine_id'], list)) @@ -109,8 +109,8 @@ class AsyncResultTest(ClusterTestCase): def test_single_result(self): ar = self.client[-1].apply_async(wait, 0.5) - self.assertRaises(TimeoutError, lambda : ar['foo']) - self.assertRaises(TimeoutError, lambda : ar['engine_id']) + self.assertRaises(KeyError, lambda : ar['foo']) + self.assertEqual(ar['engine_id'], None) self.assertTrue(ar.get(5) == 0.5) self.assertTrue(isinstance(ar['engine_id'], int)) self.assertTrue(isinstance(ar.engine_id, int)) diff --git a/IPython/parallel/tests/test_view.py b/IPython/parallel/tests/test_view.py index cc88799..866caf3 100644 --- a/IPython/parallel/tests/test_view.py +++ b/IPython/parallel/tests/test_view.py @@ -606,4 +606,25 @@ class TestView(ClusterTestCase, ParametricTestCase): ar = view.apply_async(bar) r = ar.get(10) self.assertEquals(r, 'foo') + def test_data_pub_single(self): + view = self.client[-1] + ar = view.execute('\n'.join([ + 'from IPython.zmq.datapub import publish_data', + 'for i in range(5):', + ' publish_data(dict(i=i))' + ]), block=False) + self.assertTrue(isinstance(ar.data, dict)) + ar.get(5) + self.assertEqual(ar.data, dict(i=4)) + + def test_data_pub(self): + view = self.client[:] + ar = view.execute('\n'.join([ + 'from IPython.zmq.datapub import publish_data', + 'for i in range(5):', + ' publish_data(dict(i=i))' + ]), block=False) + self.assertTrue(all(isinstance(d, dict) for d in ar.data)) + ar.get(5) + self.assertEqual(ar.data, [dict(i=4)] * len(ar)) diff --git a/IPython/zmq/datapub.py b/IPython/zmq/datapub.py new file mode 100644 index 0000000..37d696d --- /dev/null +++ b/IPython/zmq/datapub.py @@ -0,0 +1,71 @@ +"""Publishing +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2012 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from IPython.config import Configurable + +from IPython.utils.jsonutil import json_clean +from IPython.utils.traitlets import Instance, Dict, CBytes + +from IPython.zmq.serialize import serialize_object +from IPython.zmq.session import Session, extract_header + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + + +class ZMQDataPublisher(Configurable): + + topic = topic = CBytes(b'datapub') + session = Instance(Session) + pub_socket = Instance('zmq.Socket') + parent_header = Dict({}) + + def set_parent(self, parent): + """Set the parent for outbound messages.""" + self.parent_header = extract_header(parent) + + def publish_data(self, data): + """publish a data_message on the IOPub channel + + Parameters + ---------- + + data : dict + The data to be published. Think of it as a namespace. + """ + session = self.session + buffers = serialize_object(data, + buffer_threshold=session.buffer_threshold, + item_threshold=session.item_threshold, + ) + content = json_clean(dict(keys=data.keys())) + session.send(self.pub_socket, 'data_message', content=content, + parent=self.parent_header, + buffers=buffers, + ident=self.topic, + ) + + +def publish_data(data): + """publish a data_message on the IOPub channel + + Parameters + ---------- + + data : dict + The data to be published. Think of it as a namespace. + """ + from IPython.zmq.zmqshell import ZMQInteractiveShell + ZMQInteractiveShell.instance().data_pub.publish_data(data) diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index 2f756ff..9b7309e 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -147,6 +147,8 @@ class Kernel(Configurable): self.shell.displayhook.topic = self._topic('pyout') self.shell.display_pub.session = self.session self.shell.display_pub.pub_socket = self.iopub_socket + self.shell.data_pub.session = self.session + self.shell.data_pub.pub_socket = self.iopub_socket # TMP - hack while developing self.shell._reply_content = None @@ -353,6 +355,7 @@ class Kernel(Configurable): # Set the parent message of the display hook and out streams. shell.displayhook.set_parent(parent) shell.display_pub.set_parent(parent) + shell.data_pub.set_parent(parent) sys.stdout.set_parent(parent) sys.stderr.set_parent(parent) @@ -538,6 +541,7 @@ class Kernel(Configurable): shell = self.shell shell.displayhook.set_parent(parent) shell.display_pub.set_parent(parent) + shell.data_pub.set_parent(parent) sys.stdout.set_parent(parent) sys.stderr.set_parent(parent) diff --git a/IPython/zmq/zmqshell.py b/IPython/zmq/zmqshell.py index 0691ef6..48675b8 100644 --- a/IPython/zmq/zmqshell.py +++ b/IPython/zmq/zmqshell.py @@ -44,6 +44,7 @@ from IPython.utils import py3compat from IPython.utils.traitlets import Instance, Type, Dict, CBool, CBytes from IPython.utils.warn import warn, error from IPython.zmq.displayhook import ZMQShellDisplayHook +from IPython.zmq.datapub import ZMQDataPublisher from IPython.zmq.session import extract_header from session import Session @@ -464,6 +465,7 @@ class ZMQInteractiveShell(InteractiveShell): displayhook_class = Type(ZMQShellDisplayHook) display_pub_class = Type(ZMQDisplayPublisher) + data_pub_class = Type(ZMQDataPublisher) # Override the traitlet in the parent class, because there's no point using # readline for the kernel. Can be removed when the readline code is moved diff --git a/docs/source/development/messaging.txt b/docs/source/development/messaging.txt index d723b89..a46bce5 100644 --- a/docs/source/development/messaging.txt +++ b/docs/source/development/messaging.txt @@ -756,13 +756,51 @@ Message type: ``display_data``:: # The data dict contains key/value pairs, where the kids are MIME # types and the values are the raw data of the representation in that # format. The data dict must minimally contain the ``text/plain`` - # MIME type which is used as a backup representation. + # MIME type which is used as a backup representation. 'data' : dict, # Any metadata that describes the data 'metadata' : dict } + +Raw Data Publication +-------------------- + +``display_data`` lets you publish *representations* of data, such as images and html. +This ``data_pub`` message lets you publish *actual raw data*, sent via message buffers. + +data_pub messages are constructed via the :func:`IPython.lib.datapub.publish_data` function: + +.. sourcecode:: python + + from IPython.zmq.datapub import publish_data + ns = dict(x=my_array) + publish_data(ns) + + +Message type: ``data_pub``:: + + content = { + # the keys of the data dict, after it has been unserialized + keys = ['a', 'b'] + } + # the namespace dict will be serialized in the message buffers, + # which will have a length of at least one + buffers = ['pdict', ...] + + +The interpretation of a sequence of data_pub messages for a given parent request should be +to update a single namespace with subsequent results. + +.. note:: + + No frontends directly handle data_pub messages at this time. + It is currently only used by the client/engines in :mod:`IPython.parallel`, + where engines may publish *data* to the Client, + of which the Client can then publish *representations* via ``display_data`` + to various frontends. + Python inputs ------------- diff --git a/docs/source/development/parallel_messages.txt b/docs/source/development/parallel_messages.txt index 50a02bc..b51d2bc 100644 --- a/docs/source/development/parallel_messages.txt +++ b/docs/source/development/parallel_messages.txt @@ -30,7 +30,7 @@ large blocking requests or database actions in the Hub do not have the ability t job submission and results. Registration (``ROUTER``) -*********************** +************************* The first function of the Hub is to facilitate and monitor connections of clients and engines. Both client and engine registration are handled by the same socket, so only @@ -111,7 +111,7 @@ Message type : ``unregistration_notification``:: Client Queries (``ROUTER``) -************************* +*************************** The hub monitors and logs all queue traffic, so that clients can retrieve past results or monitor pending tasks. This information may reside in-memory on the Hub, or @@ -242,8 +242,8 @@ Message type: ``task_destination``:: 'engine_id' : '1234-abcd-...', # the destination engine's zmq.IDENTITY } -:func:`apply` and :func:`apply_bound` -************************************* +:func:`apply` +************* In terms of message classes, the MUX scheduler and Task scheduler relay the exact same message types. Their only difference lies in how the destination is selected. @@ -260,18 +260,17 @@ code string, must be able to send arbitrary (pickleable) Python objects. And ide as little data as we can. The `buffers` property of a Message was introduced for this purpose. -Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a +Utility method :func:`build_apply_message` in :mod:`IPython.zmq.serialize` wraps a function signature and builds a sendable buffer format for minimal data copying (exactly zero copies of numpy array data or buffers or large strings). Message type: ``apply_request``:: - content = { - 'bound' : True, # whether to execute in the engine's namespace or unbound + metadata = { 'after' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict() 'follow' : ['msg_id',...], # list of msg_ids or output of Dependency.as_dict() - } + content = {} buffers = ['...'] # at least 3 in length # as built by build_apply_message(f,args,kwargs) @@ -360,8 +359,8 @@ Split Sends Previously, messages were bundled as a single json object and one call to :func:`socket.send_json`. Since the hub inspects all messages, and doesn't need to see the content of the messages, which can be large, messages are now serialized and sent in -pieces. All messages are sent in at least 3 parts: the header, the parent header, and the -content. This allows the controller to unpack and inspect the (always small) header, +pieces. All messages are sent in at least 4 parts: the header, the parent header, the metadata and the content. +This allows the controller to unpack and inspect the (always small) header, without spending time unpacking the content unless the message is bound for the controller. Buffers are added on to the end of the message, and can be any objects that present the buffer interface.