Show More
@@ -46,7 +46,7 b' class Heart(object):' | |||||
46 | if in_type == zmq.SUB: |
|
46 | if in_type == zmq.SUB: | |
47 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") |
|
47 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") | |
48 | if heart_id is None: |
|
48 | if heart_id is None: | |
49 |
heart_id = |
|
49 | heart_id = uuid.uuid4().bytes | |
50 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) |
|
50 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) | |
51 | self.id = heart_id |
|
51 | self.id = heart_id | |
52 |
|
52 |
@@ -563,8 +563,8 b' class Hub(SessionFactory):' | |||||
563 | record = init_record(msg) |
|
563 | record = init_record(msg) | |
564 | msg_id = record['msg_id'] |
|
564 | msg_id = record['msg_id'] | |
565 | # Unicode in records |
|
565 | # Unicode in records | |
566 |
record['engine_uuid'] = queue_id.decode(' |
|
566 | record['engine_uuid'] = queue_id.decode('ascii') | |
567 |
record['client_uuid'] = client_id.decode(' |
|
567 | record['client_uuid'] = client_id.decode('ascii') | |
568 | record['queue'] = 'mux' |
|
568 | record['queue'] = 'mux' | |
569 |
|
569 | |||
570 | try: |
|
570 | try: | |
@@ -834,7 +834,7 b' class Hub(SessionFactory):' | |||||
834 | jsonable = {} |
|
834 | jsonable = {} | |
835 | for k,v in self.keytable.iteritems(): |
|
835 | for k,v in self.keytable.iteritems(): | |
836 | if v not in self.dead_engines: |
|
836 | if v not in self.dead_engines: | |
837 | jsonable[str(k)] = v.decode() |
|
837 | jsonable[str(k)] = v.decode('ascii') | |
838 | content['engines'] = jsonable |
|
838 | content['engines'] = jsonable | |
839 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) |
|
839 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) | |
840 |
|
840 |
@@ -503,7 +503,7 b' class TaskScheduler(SessionFactory):' | |||||
503 | self.add_job(idx) |
|
503 | self.add_job(idx) | |
504 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) |
|
504 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) | |
505 | # notify Hub |
|
505 | # notify Hub | |
506 | content = dict(msg_id=msg_id, engine_id=target.decode()) |
|
506 | content = dict(msg_id=msg_id, engine_id=target.decode('ascii')) | |
507 | self.session.send(self.mon_stream, 'task_destination', content=content, |
|
507 | self.session.send(self.mon_stream, 'task_destination', content=content, | |
508 | ident=[b'tracktask',self.ident]) |
|
508 | ident=[b'tracktask',self.ident]) | |
509 |
|
509 |
@@ -23,7 +23,7 b' import zmq' | |||||
23 | from zmq.eventloop import ioloop, zmqstream |
|
23 | from zmq.eventloop import ioloop, zmqstream | |
24 |
|
24 | |||
25 | # internal |
|
25 | # internal | |
26 | from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode |
|
26 | from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes | |
27 | # from IPython.utils.localinterfaces import LOCALHOST |
|
27 | # from IPython.utils.localinterfaces import LOCALHOST | |
28 |
|
28 | |||
29 | from IPython.parallel.controller.heartmonitor import Heart |
|
29 | from IPython.parallel.controller.heartmonitor import Heart | |
@@ -58,6 +58,11 b' class EngineFactory(RegistrationFactory):' | |||||
58 | registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
58 | registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') | |
59 | kernel=Instance(Kernel) |
|
59 | kernel=Instance(Kernel) | |
60 |
|
60 | |||
|
61 | bident = CBytes() | |||
|
62 | ident = Unicode() | |||
|
63 | def _ident_changed(self, name, old, new): | |||
|
64 | self.bident = ensure_bytes(new) | |||
|
65 | ||||
61 |
|
66 | |||
62 | def __init__(self, **kwargs): |
|
67 | def __init__(self, **kwargs): | |
63 | super(EngineFactory, self).__init__(**kwargs) |
|
68 | super(EngineFactory, self).__init__(**kwargs) | |
@@ -65,7 +70,7 b' class EngineFactory(RegistrationFactory):' | |||||
65 | ctx = self.context |
|
70 | ctx = self.context | |
66 |
|
71 | |||
67 | reg = ctx.socket(zmq.XREQ) |
|
72 | reg = ctx.socket(zmq.XREQ) | |
68 |
reg.setsockopt(zmq.IDENTITY, |
|
73 | reg.setsockopt(zmq.IDENTITY, self.bident) | |
69 | reg.connect(self.url) |
|
74 | reg.connect(self.url) | |
70 | self.registrar = zmqstream.ZMQStream(reg, self.loop) |
|
75 | self.registrar = zmqstream.ZMQStream(reg, self.loop) | |
71 |
|
76 | |||
@@ -83,8 +88,7 b' class EngineFactory(RegistrationFactory):' | |||||
83 | self._abort_dc.stop() |
|
88 | self._abort_dc.stop() | |
84 | ctx = self.context |
|
89 | ctx = self.context | |
85 | loop = self.loop |
|
90 | loop = self.loop | |
86 |
identity = |
|
91 | identity = self.bident | |
87 |
|
||||
88 | idents,msg = self.session.feed_identities(msg) |
|
92 | idents,msg = self.session.feed_identities(msg) | |
89 | msg = Message(self.session.unpack_message(msg)) |
|
93 | msg = Message(self.session.unpack_message(msg)) | |
90 |
|
94 | |||
@@ -139,7 +143,7 b' class EngineFactory(RegistrationFactory):' | |||||
139 | if self.display_hook_factory: |
|
143 | if self.display_hook_factory: | |
140 | sys.displayhook = self.display_hook_factory(self.session, iopub_stream) |
|
144 | sys.displayhook = self.display_hook_factory(self.session, iopub_stream) | |
141 | sys.displayhook.topic = 'engine.%i.pyout'%self.id |
|
145 | sys.displayhook.topic = 'engine.%i.pyout'%self.id | |
142 |
|
146 | |||
143 | self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, |
|
147 | self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, | |
144 | control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, |
|
148 | control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, | |
145 | loop=loop, user_ns = self.user_ns, log=self.log) |
|
149 | loop=loop, user_ns = self.user_ns, log=self.log) |
@@ -102,9 +102,9 b' class ReverseDict(dict):' | |||||
102 | #----------------------------------------------------------------------------- |
|
102 | #----------------------------------------------------------------------------- | |
103 |
|
103 | |||
104 | def ensure_bytes(s): |
|
104 | def ensure_bytes(s): | |
105 | """ensure that an object is bytes""" |
|
105 | """ensure that an object is ascii bytes""" | |
106 | if isinstance(s, unicode): |
|
106 | if isinstance(s, unicode): | |
107 |
s = s.encode( |
|
107 | s = s.encode('ascii') | |
108 | return s |
|
108 | return s | |
109 |
|
109 | |||
110 | def validate_url(url): |
|
110 | def validate_url(url): |
General Comments 0
You need to be logged in to leave comments.
Login now