Show More
@@ -45,9 +45,10 b' from IPython.zmq.session import (' | |||
|
45 | 45 | from IPython.config.configurable import Configurable |
|
46 | 46 | |
|
47 | 47 | from IPython.parallel.engine.engine import EngineFactory |
|
48 |
from IPython.parallel.util import disambiguate_url |
|
|
48 | from IPython.parallel.util import disambiguate_url | |
|
49 | 49 | |
|
50 | 50 | from IPython.utils.importstring import import_item |
|
51 | from IPython.utils.py3compat import cast_bytes | |
|
51 | 52 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float |
|
52 | 53 | |
|
53 | 54 | |
@@ -203,7 +204,7 b' class IPEngineApp(BaseParallelApplication):' | |||
|
203 | 204 | d = json.loads(f.read()) |
|
204 | 205 | |
|
205 | 206 | if 'exec_key' in d: |
|
206 | config.Session.key = asbytes(d['exec_key']) | |
|
207 | config.Session.key = cast_bytes(d['exec_key']) | |
|
207 | 208 | |
|
208 | 209 | try: |
|
209 | 210 | config.EngineFactory.location |
@@ -36,6 +36,7 b' from IPython.core.application import BaseIPythonApplication' | |||
|
36 | 36 | from IPython.utils.jsonutil import rekey |
|
37 | 37 | from IPython.utils.localinterfaces import LOCAL_IPS |
|
38 | 38 | from IPython.utils.path import get_ipython_dir |
|
39 | from IPython.utils.py3compat import cast_bytes | |
|
39 | 40 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, |
|
40 | 41 | Dict, List, Bool, Set, Any) |
|
41 | 42 | from IPython.external.decorator import decorator |
@@ -370,7 +371,7 b' class Client(HasTraits):' | |||
|
370 | 371 | if os.path.isfile(exec_key): |
|
371 | 372 | extra_args['keyfile'] = exec_key |
|
372 | 373 | else: |
|
373 |
exec_key = |
|
|
374 | exec_key = cast_bytes(exec_key) | |
|
374 | 375 | extra_args['key'] = exec_key |
|
375 | 376 | self.session = Session(**extra_args) |
|
376 | 377 | |
@@ -468,7 +469,7 b' class Client(HasTraits):' | |||
|
468 | 469 | if not isinstance(targets, (tuple, list, xrange)): |
|
469 | 470 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) |
|
470 | 471 | |
|
471 |
return [ |
|
|
472 | return [cast_bytes(self._engines[t]) for t in targets], list(targets) | |
|
472 | 473 | |
|
473 | 474 | def _connect(self, sshserver, ssh_kwargs, timeout): |
|
474 | 475 | """setup all our socket connections to the cluster. This is called from |
@@ -23,9 +23,10 b' from zmq.devices import ThreadDevice' | |||
|
23 | 23 | from zmq.eventloop import ioloop, zmqstream |
|
24 | 24 | |
|
25 | 25 | from IPython.config.configurable import LoggingConfigurable |
|
26 | from IPython.utils.py3compat import str_to_bytes | |
|
26 | 27 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer |
|
27 | 28 | |
|
28 |
from IPython.parallel.util import |
|
|
29 | from IPython.parallel.util import log_errors | |
|
29 | 30 | |
|
30 | 31 | class Heart(object): |
|
31 | 32 | """A basic heart object for responding to a HeartMonitor. |
@@ -123,7 +124,7 b' class HeartMonitor(LoggingConfigurable):' | |||
|
123 | 124 | self.responses = set() |
|
124 | 125 | # print self.on_probation, self.hearts |
|
125 | 126 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) |
|
126 |
self.pingstream.send( |
|
|
127 | self.pingstream.send(str_to_bytes(str(self.lifetime))) | |
|
127 | 128 | # flush stream to force immediate socket send |
|
128 | 129 | self.pingstream.flush() |
|
129 | 130 | |
@@ -151,8 +152,8 b' class HeartMonitor(LoggingConfigurable):' | |||
|
151 | 152 | @log_errors |
|
152 | 153 | def handle_pong(self, msg): |
|
153 | 154 | "a heart just beat" |
|
154 |
current = |
|
|
155 |
last = |
|
|
155 | current = str_to_bytes(str(self.lifetime)) | |
|
156 | last = str_to_bytes(str(self.last_ping)) | |
|
156 | 157 | if msg[1] == current: |
|
157 | 158 | delta = time.time()-self.tic |
|
158 | 159 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
@@ -28,6 +28,7 b' from zmq.eventloop.zmqstream import ZMQStream' | |||
|
28 | 28 | |
|
29 | 29 | # internal: |
|
30 | 30 | from IPython.utils.importstring import import_item |
|
31 | from IPython.utils.py3compat import cast_bytes | |
|
31 | 32 | from IPython.utils.traitlets import ( |
|
32 | 33 | HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName |
|
33 | 34 | ) |
@@ -441,7 +442,7 b' class Hub(SessionFactory):' | |||
|
441 | 442 | for t in targets: |
|
442 | 443 | # map raw identities to ids |
|
443 | 444 | if isinstance(t, (str,unicode)): |
|
444 | t = self.by_ident.get(t, t) | |
|
445 | t = self.by_ident.get(cast_bytes(t), t) | |
|
445 | 446 | _targets.append(t) |
|
446 | 447 | targets = _targets |
|
447 | 448 | bad_targets = [ t for t in targets if t not in self.ids ] |
@@ -719,8 +720,8 b' class Hub(SessionFactory):' | |||
|
719 | 720 | self.unassigned.remove(msg_id) |
|
720 | 721 | |
|
721 | 722 | header = msg['header'] |
|
722 |
engine_uuid = header.get('engine', |
|
|
723 | eid = self.by_ident.get(engine_uuid, None) | |
|
723 | engine_uuid = header.get('engine', u'') | |
|
724 | eid = self.by_ident.get(cast_bytes(engine_uuid), None) | |
|
724 | 725 | |
|
725 | 726 | status = header.get('status', None) |
|
726 | 727 | |
@@ -763,7 +764,7 b' class Hub(SessionFactory):' | |||
|
763 | 764 | # print (content) |
|
764 | 765 | msg_id = content['msg_id'] |
|
765 | 766 | engine_uuid = content['engine_id'] |
|
766 |
eid = self.by_ident[ |
|
|
767 | eid = self.by_ident[cast_bytes(engine_uuid)] | |
|
767 | 768 | |
|
768 | 769 | self.log.info("task::task %r arrived on %r", msg_id, eid) |
|
769 | 770 | if msg_id in self.unassigned: |
@@ -853,13 +854,13 b' class Hub(SessionFactory):' | |||
|
853 | 854 | """Register a new engine.""" |
|
854 | 855 | content = msg['content'] |
|
855 | 856 | try: |
|
856 |
queue = |
|
|
857 | queue = cast_bytes(content['queue']) | |
|
857 | 858 | except KeyError: |
|
858 | 859 | self.log.error("registration::queue not specified", exc_info=True) |
|
859 | 860 | return |
|
860 | 861 | heart = content.get('heartbeat', None) |
|
861 | 862 | if heart: |
|
862 |
heart = |
|
|
863 | heart = cast_bytes(heart) | |
|
863 | 864 | """register a new engine, and create the socket(s) necessary""" |
|
864 | 865 | eid = self._next_id |
|
865 | 866 | # print (eid, queue, reg, heart) |
@@ -42,10 +42,11 b' from IPython.external.decorator import decorator' | |||
|
42 | 42 | from IPython.config.application import Application |
|
43 | 43 | from IPython.config.loader import Config |
|
44 | 44 | from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes |
|
45 | from IPython.utils.py3compat import cast_bytes | |
|
45 | 46 | |
|
46 | 47 | from IPython.parallel import error, util |
|
47 | 48 | from IPython.parallel.factory import SessionFactory |
|
48 |
from IPython.parallel.util import connect_logger, local_logger |
|
|
49 | from IPython.parallel.util import connect_logger, local_logger | |
|
49 | 50 | |
|
50 | 51 | from .dependency import Dependency |
|
51 | 52 | |
@@ -262,7 +263,7 b' class TaskScheduler(SessionFactory):' | |||
|
262 | 263 | self.log.error("Unhandled message type: %r"%msg_type) |
|
263 | 264 | else: |
|
264 | 265 | try: |
|
265 | handler(asbytes(msg['content']['queue'])) | |
|
266 | handler(cast_bytes(msg['content']['queue'])) | |
|
266 | 267 | except Exception: |
|
267 | 268 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) |
|
268 | 269 | |
@@ -316,7 +317,7 b' class TaskScheduler(SessionFactory):' | |||
|
316 | 317 | # prevent double-handling of messages |
|
317 | 318 | continue |
|
318 | 319 | |
|
319 |
raw_msg = lost[msg_id] |
|
|
320 | raw_msg = lost[msg_id].raw_msg | |
|
320 | 321 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
321 | 322 | parent = self.session.unpack(msg[1].bytes) |
|
322 | 323 | idents = [engine, idents[0]] |
@@ -370,7 +371,7 b' class TaskScheduler(SessionFactory):' | |||
|
370 | 371 | # get targets as a set of bytes objects |
|
371 | 372 | # from a list of unicode objects |
|
372 | 373 | targets = header.get('targets', []) |
|
373 | targets = map(asbytes, targets) | |
|
374 | targets = map(cast_bytes, targets) | |
|
374 | 375 | targets = set(targets) |
|
375 | 376 | |
|
376 | 377 | retries = header.get('retries', 0) |
@@ -27,11 +27,11 b' from IPython.external.ssh import tunnel' | |||
|
27 | 27 | from IPython.utils.traitlets import ( |
|
28 | 28 | Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool |
|
29 | 29 | ) |
|
30 |
from IPython.utils import |
|
|
30 | from IPython.utils.py3compat import cast_bytes | |
|
31 | 31 | |
|
32 | 32 | from IPython.parallel.controller.heartmonitor import Heart |
|
33 | 33 | from IPython.parallel.factory import RegistrationFactory |
|
34 |
from IPython.parallel.util import disambiguate_url |
|
|
34 | from IPython.parallel.util import disambiguate_url | |
|
35 | 35 | |
|
36 | 36 | from IPython.zmq.session import Message |
|
37 | 37 | from IPython.zmq.ipkernel import Kernel |
@@ -69,7 +69,7 b' class EngineFactory(RegistrationFactory):' | |||
|
69 | 69 | bident = CBytes() |
|
70 | 70 | ident = Unicode() |
|
71 | 71 | def _ident_changed(self, name, old, new): |
|
72 | self.bident = asbytes(new) | |
|
72 | self.bident = cast_bytes(new) | |
|
73 | 73 | using_ssh=Bool(False) |
|
74 | 74 | |
|
75 | 75 | |
@@ -194,12 +194,12 b' class EngineFactory(RegistrationFactory):' | |||
|
194 | 194 | # Redirect input streams and set a display hook. |
|
195 | 195 | if self.out_stream_factory: |
|
196 | 196 | sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout') |
|
197 |
sys.stdout.topic = |
|
|
197 | sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id) | |
|
198 | 198 | sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr') |
|
199 |
sys.stderr.topic = |
|
|
199 | sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id) | |
|
200 | 200 | if self.display_hook_factory: |
|
201 | 201 | sys.displayhook = self.display_hook_factory(self.session, iopub_socket) |
|
202 |
sys.displayhook.topic = |
|
|
202 | sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id) | |
|
203 | 203 | |
|
204 | 204 | self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, |
|
205 | 205 | control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket, |
@@ -124,12 +124,6 b' def log_errors(f, self, *args, **kwargs):' | |||
|
124 | 124 | self.log.error("Uncaught exception in %r" % f, exc_info=True) |
|
125 | 125 | |
|
126 | 126 | |
|
127 | def asbytes(s): | |
|
128 | """ensure that an object is ascii bytes""" | |
|
129 | if isinstance(s, unicode): | |
|
130 | s = s.encode('ascii') | |
|
131 | return s | |
|
132 | ||
|
133 | 127 | def is_url(url): |
|
134 | 128 | """boolean check for whether a string is a zmq url""" |
|
135 | 129 | if '://' not in url: |
General Comments 0
You need to be logged in to leave comments.
Login now