diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 09903e0..233226c 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -466,7 +466,7 @@ class Client(HasTraits): self.session = Session(**extra_args) self._query_socket = self._context.socket(zmq.DEALER) - self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) + if self._ssh: tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) else: @@ -607,29 +607,21 @@ class Client(HasTraits): ident = self.session.bsession if content.mux: self._mux_socket = self._context.socket(zmq.DEALER) - self._mux_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._mux_socket, content.mux) if content.task: self._task_scheme, task_addr = content.task self._task_socket = self._context.socket(zmq.DEALER) - self._task_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._task_socket, task_addr) if content.notification: self._notification_socket = self._context.socket(zmq.SUB) connect_socket(self._notification_socket, content.notification) self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') - # if content.query: - # self._query_socket = self._context.socket(zmq.DEALER) - # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) - # connect_socket(self._query_socket, content.query) if content.control: self._control_socket = self._context.socket(zmq.DEALER) - self._control_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._control_socket, content.control) if content.iopub: self._iopub_socket = self._context.socket(zmq.SUB) self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') - self._iopub_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._iopub_socket, content.iopub) self._update_engines(dict(content.engines)) else: diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 9a5c6fb..2967c7b 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -585,7 +585,7 @@ class Hub(SessionFactory): self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid) # Unicode in records record['engine_uuid'] = queue_id.decode('ascii') - record['client_uuid'] = client_id.decode('ascii') + record['client_uuid'] = msg['header']['session'] record['queue'] = 'mux' try: @@ -677,7 +677,7 @@ class Hub(SessionFactory): return record = init_record(msg) - record['client_uuid'] = client_id.decode('ascii') + record['client_uuid'] = msg['header']['session'] record['queue'] = 'task' header = msg['header'] msg_id = header['msg_id'] @@ -1205,6 +1205,7 @@ class Hub(SessionFactory): self.db.add_record(msg_id, init_record(msg)) except Exception: self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) + return finish(error.wrap_exception()) finish(dict(status='ok', resubmitted=resubmitted))