Show More
@@ -370,7 +370,7 b' class Client(HasTraits):' | |||||
370 | self.session = Session(**extra_args) |
|
370 | self.session = Session(**extra_args) | |
371 |
|
371 | |||
372 | self._query_socket = self._context.socket(zmq.DEALER) |
|
372 | self._query_socket = self._context.socket(zmq.DEALER) | |
373 |
self._query_socket.setsockopt(zmq.IDENTITY, |
|
373 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
374 | if self._ssh: |
|
374 | if self._ssh: | |
375 | tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) |
|
375 | tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) | |
376 | else: |
|
376 | else: | |
@@ -496,7 +496,7 b' class Client(HasTraits):' | |||||
496 | content = msg.content |
|
496 | content = msg.content | |
497 | self._config['registration'] = dict(content) |
|
497 | self._config['registration'] = dict(content) | |
498 | if content.status == 'ok': |
|
498 | if content.status == 'ok': | |
499 |
ident = |
|
499 | ident = self.session.bsession | |
500 | if content.mux: |
|
500 | if content.mux: | |
501 | self._mux_socket = self._context.socket(zmq.DEALER) |
|
501 | self._mux_socket = self._context.socket(zmq.DEALER) | |
502 | self._mux_socket.setsockopt(zmq.IDENTITY, ident) |
|
502 | self._mux_socket.setsockopt(zmq.IDENTITY, ident) | |
@@ -512,7 +512,7 b' class Client(HasTraits):' | |||||
512 | self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') |
|
512 | self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') | |
513 | # if content.query: |
|
513 | # if content.query: | |
514 | # self._query_socket = self._context.socket(zmq.DEALER) |
|
514 | # self._query_socket = self._context.socket(zmq.DEALER) | |
515 | # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
515 | # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
516 | # connect_socket(self._query_socket, content.query) |
|
516 | # connect_socket(self._query_socket, content.query) | |
517 | if content.control: |
|
517 | if content.control: | |
518 | self._control_socket = self._context.socket(zmq.DEALER) |
|
518 | self._control_socket = self._context.socket(zmq.DEALER) |
@@ -288,7 +288,7 b' class HubFactory(RegistrationFactory):' | |||||
288 | # resubmit stream |
|
288 | # resubmit stream | |
289 | r = ZMQStream(ctx.socket(zmq.DEALER), loop) |
|
289 | r = ZMQStream(ctx.socket(zmq.DEALER), loop) | |
290 | url = util.disambiguate_url(self.client_info['task'][-1]) |
|
290 | url = util.disambiguate_url(self.client_info['task'][-1]) | |
291 |
r.setsockopt(zmq.IDENTITY, |
|
291 | r.setsockopt(zmq.IDENTITY, self.session.bsession) | |
292 | r.connect(url) |
|
292 | r.connect(url) | |
293 |
|
293 | |||
294 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
294 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
@@ -177,7 +177,7 b' class TaskScheduler(SessionFactory):' | |||||
177 | ident = CBytes() # ZMQ identity. This should just be self.session.session |
|
177 | ident = CBytes() # ZMQ identity. This should just be self.session.session | |
178 | # but ensure Bytes |
|
178 | # but ensure Bytes | |
179 | def _ident_default(self): |
|
179 | def _ident_default(self): | |
180 |
return |
|
180 | return self.session.bsession | |
181 |
|
181 | |||
182 | def start(self): |
|
182 | def start(self): | |
183 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
183 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
@@ -188,7 +188,7 b' class ShellSocketChannel(ZMQSocketChannel):' | |||||
188 | def run(self): |
|
188 | def run(self): | |
189 | """The thread's main activity. Call start() instead.""" |
|
189 | """The thread's main activity. Call start() instead.""" | |
190 | self.socket = self.context.socket(zmq.DEALER) |
|
190 | self.socket = self.context.socket(zmq.DEALER) | |
191 |
self.socket.setsockopt(zmq.IDENTITY, self.session.session |
|
191 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
192 | self.socket.connect('tcp://%s:%i' % self.address) |
|
192 | self.socket.connect('tcp://%s:%i' % self.address) | |
193 | self.iostate = POLLERR|POLLIN |
|
193 | self.iostate = POLLERR|POLLIN | |
194 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
194 | self.ioloop.add_handler(self.socket, self._handle_events, | |
@@ -395,7 +395,7 b' class SubSocketChannel(ZMQSocketChannel):' | |||||
395 | """The thread's main activity. Call start() instead.""" |
|
395 | """The thread's main activity. Call start() instead.""" | |
396 | self.socket = self.context.socket(zmq.SUB) |
|
396 | self.socket = self.context.socket(zmq.SUB) | |
397 | self.socket.setsockopt(zmq.SUBSCRIBE,b'') |
|
397 | self.socket.setsockopt(zmq.SUBSCRIBE,b'') | |
398 |
self.socket.setsockopt(zmq.IDENTITY, self.session.session |
|
398 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
399 | self.socket.connect('tcp://%s:%i' % self.address) |
|
399 | self.socket.connect('tcp://%s:%i' % self.address) | |
400 | self.iostate = POLLIN|POLLERR |
|
400 | self.iostate = POLLIN|POLLERR | |
401 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
401 | self.ioloop.add_handler(self.socket, self._handle_events, | |
@@ -483,7 +483,7 b' class StdInSocketChannel(ZMQSocketChannel):' | |||||
483 | def run(self): |
|
483 | def run(self): | |
484 | """The thread's main activity. Call start() instead.""" |
|
484 | """The thread's main activity. Call start() instead.""" | |
485 | self.socket = self.context.socket(zmq.DEALER) |
|
485 | self.socket = self.context.socket(zmq.DEALER) | |
486 |
self.socket.setsockopt(zmq.IDENTITY, self.session.session |
|
486 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
487 | self.socket.connect('tcp://%s:%i' % self.address) |
|
487 | self.socket.connect('tcp://%s:%i' % self.address) | |
488 | self.iostate = POLLERR|POLLIN |
|
488 | self.iostate = POLLERR|POLLIN | |
489 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
489 | self.ioloop.add_handler(self.socket, self._handle_events, | |
@@ -562,7 +562,7 b' class HBSocketChannel(ZMQSocketChannel):' | |||||
562 |
|
562 | |||
563 | def _create_socket(self): |
|
563 | def _create_socket(self): | |
564 | self.socket = self.context.socket(zmq.REQ) |
|
564 | self.socket = self.context.socket(zmq.REQ) | |
565 |
self.socket.setsockopt(zmq.IDENTITY, self.session.session |
|
565 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
566 | self.socket.connect('tcp://%s:%i' % self.address) |
|
566 | self.socket.connect('tcp://%s:%i' % self.address) | |
567 | self.poller = zmq.Poller() |
|
567 | self.poller = zmq.Poller() | |
568 | self.poller.register(self.socket, zmq.POLLIN) |
|
568 | self.poller.register(self.socket, zmq.POLLIN) |
@@ -242,7 +242,15 b' class Session(Configurable):' | |||||
242 | session = CUnicode(u'', config=True, |
|
242 | session = CUnicode(u'', config=True, | |
243 | help="""The UUID identifying this session.""") |
|
243 | help="""The UUID identifying this session.""") | |
244 | def _session_default(self): |
|
244 | def _session_default(self): | |
245 |
|
|
245 | u = unicode(uuid.uuid4()) | |
|
246 | self.bsession = u.encode('ascii') | |||
|
247 | return u | |||
|
248 | ||||
|
249 | def _session_changed(self, name, old, new): | |||
|
250 | self.bsession = self.session.encode('ascii') | |||
|
251 | ||||
|
252 | # bsession is the session as bytes | |||
|
253 | bsession = CBytes(b'') | |||
246 |
|
254 | |||
247 | username = Unicode(os.environ.get('USER',u'username'), config=True, |
|
255 | username = Unicode(os.environ.get('USER',u'username'), config=True, | |
248 | help="""Username for the Session. Default is your system username.""") |
|
256 | help="""Username for the Session. Default is your system username.""") | |
@@ -296,9 +304,11 b' class Session(Configurable):' | |||||
296 | pack/unpack : callables |
|
304 | pack/unpack : callables | |
297 | You can also set the pack/unpack callables for serialization |
|
305 | You can also set the pack/unpack callables for serialization | |
298 | directly. |
|
306 | directly. | |
299 |
session : |
|
307 | session : unicode (must be ascii) | |
300 | the ID of this Session object. The default is to generate a new |
|
308 | the ID of this Session object. The default is to generate a new | |
301 | UUID. |
|
309 | UUID. | |
|
310 | bsession : bytes | |||
|
311 | The session as bytes | |||
302 | username : unicode |
|
312 | username : unicode | |
303 | username added to message headers. The default is to ask the OS. |
|
313 | username added to message headers. The default is to ask the OS. | |
304 | key : bytes |
|
314 | key : bytes | |
@@ -311,6 +321,8 b' class Session(Configurable):' | |||||
311 | super(Session, self).__init__(**kwargs) |
|
321 | super(Session, self).__init__(**kwargs) | |
312 | self._check_packers() |
|
322 | self._check_packers() | |
313 | self.none = self.pack({}) |
|
323 | self.none = self.pack({}) | |
|
324 | # ensure self._session_default() if necessary, so bsession is defined: | |||
|
325 | self.session | |||
314 |
|
326 | |||
315 | @property |
|
327 | @property | |
316 | def msg_id(self): |
|
328 | def msg_id(self): |
@@ -185,4 +185,26 b' class TestSession(SessionTestCase):' | |||||
185 | content = dict(code='whoda',stuff=object()) |
|
185 | content = dict(code='whoda',stuff=object()) | |
186 | themsg = self.session.msg('execute',content=content) |
|
186 | themsg = self.session.msg('execute',content=content) | |
187 | pmsg = theids |
|
187 | pmsg = theids | |
|
188 | ||||
|
189 | def test_session_id(self): | |||
|
190 | session = ss.Session() | |||
|
191 | # get bs before us | |||
|
192 | bs = session.bsession | |||
|
193 | us = session.session | |||
|
194 | self.assertEquals(us.encode('ascii'), bs) | |||
|
195 | session = ss.Session() | |||
|
196 | # get us before bs | |||
|
197 | us = session.session | |||
|
198 | bs = session.bsession | |||
|
199 | self.assertEquals(us.encode('ascii'), bs) | |||
|
200 | # change propagates: | |||
|
201 | session.session = 'something else' | |||
|
202 | bs = session.bsession | |||
|
203 | us = session.session | |||
|
204 | self.assertEquals(us.encode('ascii'), bs) | |||
|
205 | session = ss.Session(session='stuff') | |||
|
206 | # get us before bs | |||
|
207 | self.assertEquals(session.bsession, session.session.encode('ascii')) | |||
|
208 | self.assertEquals(b'stuff', session.bsession) | |||
|
209 | ||||
188 |
|
210 |
General Comments 0
You need to be logged in to leave comments.
Login now