Show More
@@ -46,7 +46,7 b' class Heart(object):' | |||
|
46 | 46 | if in_type == zmq.SUB: |
|
47 | 47 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") |
|
48 | 48 | if heart_id is None: |
|
49 |
heart_id = |
|
|
49 | heart_id = uuid.uuid4().bytes | |
|
50 | 50 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) |
|
51 | 51 | self.id = heart_id |
|
52 | 52 |
@@ -563,8 +563,8 b' class Hub(SessionFactory):' | |||
|
563 | 563 | record = init_record(msg) |
|
564 | 564 | msg_id = record['msg_id'] |
|
565 | 565 | # Unicode in records |
|
566 |
record['engine_uuid'] = queue_id.decode(' |
|
|
567 |
record['client_uuid'] = client_id.decode(' |
|
|
566 | record['engine_uuid'] = queue_id.decode('ascii') | |
|
567 | record['client_uuid'] = client_id.decode('ascii') | |
|
568 | 568 | record['queue'] = 'mux' |
|
569 | 569 | |
|
570 | 570 | try: |
@@ -834,7 +834,7 b' class Hub(SessionFactory):' | |||
|
834 | 834 | jsonable = {} |
|
835 | 835 | for k,v in self.keytable.iteritems(): |
|
836 | 836 | if v not in self.dead_engines: |
|
837 | jsonable[str(k)] = v.decode() | |
|
837 | jsonable[str(k)] = v.decode('ascii') | |
|
838 | 838 | content['engines'] = jsonable |
|
839 | 839 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) |
|
840 | 840 |
@@ -503,7 +503,7 b' class TaskScheduler(SessionFactory):' | |||
|
503 | 503 | self.add_job(idx) |
|
504 | 504 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) |
|
505 | 505 | # notify Hub |
|
506 | content = dict(msg_id=msg_id, engine_id=target.decode()) | |
|
506 | content = dict(msg_id=msg_id, engine_id=target.decode('ascii')) | |
|
507 | 507 | self.session.send(self.mon_stream, 'task_destination', content=content, |
|
508 | 508 | ident=[b'tracktask',self.ident]) |
|
509 | 509 |
@@ -23,7 +23,7 b' import zmq' | |||
|
23 | 23 | from zmq.eventloop import ioloop, zmqstream |
|
24 | 24 | |
|
25 | 25 | # internal |
|
26 | from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode | |
|
26 | from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes | |
|
27 | 27 | # from IPython.utils.localinterfaces import LOCALHOST |
|
28 | 28 | |
|
29 | 29 | from IPython.parallel.controller.heartmonitor import Heart |
@@ -58,6 +58,11 b' class EngineFactory(RegistrationFactory):' | |||
|
58 | 58 | registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
59 | 59 | kernel=Instance(Kernel) |
|
60 | 60 | |
|
61 | bident = CBytes() | |
|
62 | ident = Unicode() | |
|
63 | def _ident_changed(self, name, old, new): | |
|
64 | self.bident = ensure_bytes(new) | |
|
65 | ||
|
61 | 66 | |
|
62 | 67 | def __init__(self, **kwargs): |
|
63 | 68 | super(EngineFactory, self).__init__(**kwargs) |
@@ -65,7 +70,7 b' class EngineFactory(RegistrationFactory):' | |||
|
65 | 70 | ctx = self.context |
|
66 | 71 | |
|
67 | 72 | reg = ctx.socket(zmq.XREQ) |
|
68 |
reg.setsockopt(zmq.IDENTITY, |
|
|
73 | reg.setsockopt(zmq.IDENTITY, self.bident) | |
|
69 | 74 | reg.connect(self.url) |
|
70 | 75 | self.registrar = zmqstream.ZMQStream(reg, self.loop) |
|
71 | 76 | |
@@ -83,8 +88,7 b' class EngineFactory(RegistrationFactory):' | |||
|
83 | 88 | self._abort_dc.stop() |
|
84 | 89 | ctx = self.context |
|
85 | 90 | loop = self.loop |
|
86 |
identity = |
|
|
87 | ||
|
91 | identity = self.bident | |
|
88 | 92 | idents,msg = self.session.feed_identities(msg) |
|
89 | 93 | msg = Message(self.session.unpack_message(msg)) |
|
90 | 94 |
@@ -102,9 +102,9 b' class ReverseDict(dict):' | |||
|
102 | 102 | #----------------------------------------------------------------------------- |
|
103 | 103 | |
|
104 | 104 | def ensure_bytes(s): |
|
105 | """ensure that an object is bytes""" | |
|
105 | """ensure that an object is ascii bytes""" | |
|
106 | 106 | if isinstance(s, unicode): |
|
107 |
s = s.encode( |
|
|
107 | s = s.encode('ascii') | |
|
108 | 108 | return s |
|
109 | 109 | |
|
110 | 110 | def validate_url(url): |
General Comments 0
You need to be logged in to leave comments.
Login now