##// END OF EJS Templates
enforce ascii identities in parallel code...
MinRK -
Show More
@@ -46,7 +46,7 b' class Heart(object):'
46 if in_type == zmq.SUB:
46 if in_type == zmq.SUB:
47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
48 if heart_id is None:
48 if heart_id is None:
49 heart_id = ensure_bytes(uuid.uuid4())
49 heart_id = uuid.uuid4().bytes
50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
51 self.id = heart_id
51 self.id = heart_id
52
52
@@ -563,8 +563,8 b' class Hub(SessionFactory):'
563 record = init_record(msg)
563 record = init_record(msg)
564 msg_id = record['msg_id']
564 msg_id = record['msg_id']
565 # Unicode in records
565 # Unicode in records
566 record['engine_uuid'] = queue_id.decode('utf8', 'replace')
566 record['engine_uuid'] = queue_id.decode('ascii')
567 record['client_uuid'] = client_id.decode('utf8', 'replace')
567 record['client_uuid'] = client_id.decode('ascii')
568 record['queue'] = 'mux'
568 record['queue'] = 'mux'
569
569
570 try:
570 try:
@@ -834,7 +834,7 b' class Hub(SessionFactory):'
834 jsonable = {}
834 jsonable = {}
835 for k,v in self.keytable.iteritems():
835 for k,v in self.keytable.iteritems():
836 if v not in self.dead_engines:
836 if v not in self.dead_engines:
837 jsonable[str(k)] = v.decode()
837 jsonable[str(k)] = v.decode('ascii')
838 content['engines'] = jsonable
838 content['engines'] = jsonable
839 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
839 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
840
840
@@ -503,7 +503,7 b' class TaskScheduler(SessionFactory):'
503 self.add_job(idx)
503 self.add_job(idx)
504 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
504 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
505 # notify Hub
505 # notify Hub
506 content = dict(msg_id=msg_id, engine_id=target.decode())
506 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
507 self.session.send(self.mon_stream, 'task_destination', content=content,
507 self.session.send(self.mon_stream, 'task_destination', content=content,
508 ident=[b'tracktask',self.ident])
508 ident=[b'tracktask',self.ident])
509
509
@@ -23,7 +23,7 b' import zmq'
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 # internal
25 # internal
26 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode
26 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes
27 # from IPython.utils.localinterfaces import LOCALHOST
27 # from IPython.utils.localinterfaces import LOCALHOST
28
28
29 from IPython.parallel.controller.heartmonitor import Heart
29 from IPython.parallel.controller.heartmonitor import Heart
@@ -58,6 +58,11 b' class EngineFactory(RegistrationFactory):'
58 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
58 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
59 kernel=Instance(Kernel)
59 kernel=Instance(Kernel)
60
60
61 bident = CBytes()
62 ident = Unicode()
63 def _ident_changed(self, name, old, new):
64 self.bident = ensure_bytes(new)
65
61
66
62 def __init__(self, **kwargs):
67 def __init__(self, **kwargs):
63 super(EngineFactory, self).__init__(**kwargs)
68 super(EngineFactory, self).__init__(**kwargs)
@@ -65,7 +70,7 b' class EngineFactory(RegistrationFactory):'
65 ctx = self.context
70 ctx = self.context
66
71
67 reg = ctx.socket(zmq.XREQ)
72 reg = ctx.socket(zmq.XREQ)
68 reg.setsockopt(zmq.IDENTITY, ensure_bytes(self.ident))
73 reg.setsockopt(zmq.IDENTITY, self.bident)
69 reg.connect(self.url)
74 reg.connect(self.url)
70 self.registrar = zmqstream.ZMQStream(reg, self.loop)
75 self.registrar = zmqstream.ZMQStream(reg, self.loop)
71
76
@@ -83,8 +88,7 b' class EngineFactory(RegistrationFactory):'
83 self._abort_dc.stop()
88 self._abort_dc.stop()
84 ctx = self.context
89 ctx = self.context
85 loop = self.loop
90 loop = self.loop
86 identity = ensure_bytes(self.ident)
91 identity = self.bident
87
88 idents,msg = self.session.feed_identities(msg)
92 idents,msg = self.session.feed_identities(msg)
89 msg = Message(self.session.unpack_message(msg))
93 msg = Message(self.session.unpack_message(msg))
90
94
@@ -139,7 +143,7 b' class EngineFactory(RegistrationFactory):'
139 if self.display_hook_factory:
143 if self.display_hook_factory:
140 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
144 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
141 sys.displayhook.topic = 'engine.%i.pyout'%self.id
145 sys.displayhook.topic = 'engine.%i.pyout'%self.id
142
146
143 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
147 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
144 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
148 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
145 loop=loop, user_ns = self.user_ns, log=self.log)
149 loop=loop, user_ns = self.user_ns, log=self.log)
@@ -102,9 +102,9 b' class ReverseDict(dict):'
102 #-----------------------------------------------------------------------------
102 #-----------------------------------------------------------------------------
103
103
104 def ensure_bytes(s):
104 def ensure_bytes(s):
105 """ensure that an object is bytes"""
105 """ensure that an object is ascii bytes"""
106 if isinstance(s, unicode):
106 if isinstance(s, unicode):
107 s = s.encode(sys.getdefaultencoding(), 'replace')
107 s = s.encode('ascii')
108 return s
108 return s
109
109
110 def validate_url(url):
110 def validate_url(url):
General Comments 0
You need to be logged in to leave comments. Login now