diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index f313329..4aa137a 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -854,15 +854,19 @@ class Client(HasTraits): if self.debug: pprint(msg) parent = msg['parent_header'] - # ignore IOPub messages with no parent. - # Caused by print statements or warnings from before the first execution. - if not parent: + if not parent or parent['session'] != self.session.session: + # ignore IOPub messages not from here idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) continue msg_id = parent['msg_id'] content = msg['content'] header = msg['header'] msg_type = msg['header']['msg_type'] + + if msg_type == 'status' and msg_id not in self.metadata: + # ignore status messages if they aren't mine + idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) + continue # init metadata: md = self.metadata[msg_id] diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 51e70f1..40e55cc 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -850,19 +850,18 @@ class Hub(SessionFactory): msg_id = parent['msg_id'] msg_type = msg['header']['msg_type'] content = msg['content'] - + # ensure msg_id is in db try: rec = self.db.get_record(msg_id) except KeyError: - rec = empty_record() - rec['msg_id'] = msg_id - self.db.add_record(msg_id, rec) + rec = None + # stream d = {} if msg_type == 'stream': name = content['name'] - s = rec[name] or '' + s = '' if rec is None else rec[name] d[name] = s + content['data'] elif msg_type == 'error': @@ -880,9 +879,19 @@ class Hub(SessionFactory): if not d: return - + + if rec is None: + # new record + rec = empty_record() + rec['msg_id'] = msg_id + rec.update(d) + d = rec + update_record = self.db.add_record + else: + update_record = self.db.update_record + try: - self.db.update_record(msg_id, d) + update_record(msg_id, d) except Exception: self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)