diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index bee991a..4f3acf7 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -370,7 +370,7 @@ class Client(HasTraits): self.session = Session(**extra_args) self._query_socket = self._context.socket(zmq.DEALER) - self._query_socket.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session)) + self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) if self._ssh: tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) else: @@ -496,7 +496,7 @@ class Client(HasTraits): content = msg.content self._config['registration'] = dict(content) if content.status == 'ok': - ident = util.asbytes(self.session.session) + ident = self.session.bsession if content.mux: self._mux_socket = self._context.socket(zmq.DEALER) self._mux_socket.setsockopt(zmq.IDENTITY, ident) @@ -512,7 +512,7 @@ class Client(HasTraits): self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') # if content.query: # self._query_socket = self._context.socket(zmq.DEALER) - # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) + # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) # connect_socket(self._query_socket, content.query) if content.control: self._control_socket = self._context.socket(zmq.DEALER) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index c3fe1c1..9dde8fb 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -288,7 +288,7 @@ class HubFactory(RegistrationFactory): # resubmit stream r = ZMQStream(ctx.socket(zmq.DEALER), loop) url = util.disambiguate_url(self.client_info['task'][-1]) - r.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session)) + r.setsockopt(zmq.IDENTITY, self.session.bsession) r.connect(url) self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index b7f63eb..b063123 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -177,7 +177,7 @@ class TaskScheduler(SessionFactory): ident = CBytes() # ZMQ identity. This should just be self.session.session # but ensure Bytes def _ident_default(self): - return asbytes(self.session.session) + return self.session.bsession def start(self): self.engine_stream.on_recv(self.dispatch_result, copy=False) diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index a688d41..841a1b9 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -188,7 +188,7 @@ class ShellSocketChannel(ZMQSocketChannel): def run(self): """The thread's main activity. Call start() instead.""" self.socket = self.context.socket(zmq.DEALER) - self.socket.setsockopt(zmq.IDENTITY, self.session.session.encode("ascii")) + self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) self.socket.connect('tcp://%s:%i' % self.address) self.iostate = POLLERR|POLLIN self.ioloop.add_handler(self.socket, self._handle_events, @@ -395,7 +395,7 @@ class SubSocketChannel(ZMQSocketChannel): """The thread's main activity. Call start() instead.""" self.socket = self.context.socket(zmq.SUB) self.socket.setsockopt(zmq.SUBSCRIBE,b'') - self.socket.setsockopt(zmq.IDENTITY, self.session.session.encode("ascii")) + self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) self.socket.connect('tcp://%s:%i' % self.address) self.iostate = POLLIN|POLLERR self.ioloop.add_handler(self.socket, self._handle_events, @@ -483,7 +483,7 @@ class StdInSocketChannel(ZMQSocketChannel): def run(self): """The thread's main activity. Call start() instead.""" self.socket = self.context.socket(zmq.DEALER) - self.socket.setsockopt(zmq.IDENTITY, self.session.session.encode("ascii")) + self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) self.socket.connect('tcp://%s:%i' % self.address) self.iostate = POLLERR|POLLIN self.ioloop.add_handler(self.socket, self._handle_events, @@ -562,7 +562,7 @@ class HBSocketChannel(ZMQSocketChannel): def _create_socket(self): self.socket = self.context.socket(zmq.REQ) - self.socket.setsockopt(zmq.IDENTITY, self.session.session.encode("ascii")) + self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) self.socket.connect('tcp://%s:%i' % self.address) self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index 55df686..4636f91 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -242,7 +242,15 @@ class Session(Configurable): session = CUnicode(u'', config=True, help="""The UUID identifying this session.""") def _session_default(self): - return unicode(uuid.uuid4()) + u = unicode(uuid.uuid4()) + self.bsession = u.encode('ascii') + return u + + def _session_changed(self, name, old, new): + self.bsession = self.session.encode('ascii') + + # bsession is the session as bytes + bsession = CBytes(b'') username = Unicode(os.environ.get('USER',u'username'), config=True, help="""Username for the Session. Default is your system username.""") @@ -296,9 +304,11 @@ class Session(Configurable): pack/unpack : callables You can also set the pack/unpack callables for serialization directly. - session : bytes + session : unicode (must be ascii) the ID of this Session object. The default is to generate a new UUID. + bsession : bytes + The session as bytes username : unicode username added to message headers. The default is to ask the OS. key : bytes @@ -311,6 +321,8 @@ class Session(Configurable): super(Session, self).__init__(**kwargs) self._check_packers() self.none = self.pack({}) + # ensure self._session_default() if necessary, so bsession is defined: + self.session @property def msg_id(self): diff --git a/IPython/zmq/tests/test_session.py b/IPython/zmq/tests/test_session.py index 42ac635..b2b522c 100644 --- a/IPython/zmq/tests/test_session.py +++ b/IPython/zmq/tests/test_session.py @@ -185,4 +185,26 @@ class TestSession(SessionTestCase): content = dict(code='whoda',stuff=object()) themsg = self.session.msg('execute',content=content) pmsg = theids + + def test_session_id(self): + session = ss.Session() + # get bs before us + bs = session.bsession + us = session.session + self.assertEquals(us.encode('ascii'), bs) + session = ss.Session() + # get us before bs + us = session.session + bs = session.bsession + self.assertEquals(us.encode('ascii'), bs) + # change propagates: + session.session = 'something else' + bs = session.bsession + us = session.session + self.assertEquals(us.encode('ascii'), bs) + session = ss.Session(session='stuff') + # get us before bs + self.assertEquals(session.bsession, session.session.encode('ascii')) + self.assertEquals(b'stuff', session.bsession) +