diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index 65ab165..b249a74 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -45,9 +45,10 @@ from IPython.zmq.session import ( from IPython.config.configurable import Configurable from IPython.parallel.engine.engine import EngineFactory -from IPython.parallel.util import disambiguate_url, asbytes +from IPython.parallel.util import disambiguate_url from IPython.utils.importstring import import_item +from IPython.utils.py3compat import cast_bytes from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float @@ -203,7 +204,7 @@ class IPEngineApp(BaseParallelApplication): d = json.loads(f.read()) if 'exec_key' in d: - config.Session.key = asbytes(d['exec_key']) + config.Session.key = cast_bytes(d['exec_key']) try: config.EngineFactory.location diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 3ef8bf0..3a9bfcd 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -36,6 +36,7 @@ from IPython.core.application import BaseIPythonApplication from IPython.utils.jsonutil import rekey from IPython.utils.localinterfaces import LOCAL_IPS from IPython.utils.path import get_ipython_dir +from IPython.utils.py3compat import cast_bytes from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, Dict, List, Bool, Set, Any) from IPython.external.decorator import decorator @@ -370,7 +371,7 @@ class Client(HasTraits): if os.path.isfile(exec_key): extra_args['keyfile'] = exec_key else: - exec_key = util.asbytes(exec_key) + exec_key = cast_bytes(exec_key) extra_args['key'] = exec_key self.session = Session(**extra_args) @@ -468,7 +469,7 @@ class Client(HasTraits): if not isinstance(targets, (tuple, list, xrange)): raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) - return [util.asbytes(self._engines[t]) for t in targets], list(targets) + return [cast_bytes(self._engines[t]) for t in targets], list(targets) def _connect(self, sshserver, ssh_kwargs, timeout): """setup all our socket connections to the cluster. This is called from diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index 83d967d..927af4c 100755 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -23,9 +23,10 @@ from zmq.devices import ThreadDevice from zmq.eventloop import ioloop, zmqstream from IPython.config.configurable import LoggingConfigurable +from IPython.utils.py3compat import str_to_bytes from IPython.utils.traitlets import Set, Instance, CFloat, Integer -from IPython.parallel.util import asbytes, log_errors +from IPython.parallel.util import log_errors class Heart(object): """A basic heart object for responding to a HeartMonitor. @@ -123,7 +124,7 @@ class HeartMonitor(LoggingConfigurable): self.responses = set() # print self.on_probation, self.hearts # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) - self.pingstream.send(asbytes(str(self.lifetime))) + self.pingstream.send(str_to_bytes(str(self.lifetime))) # flush stream to force immediate socket send self.pingstream.flush() @@ -151,8 +152,8 @@ class HeartMonitor(LoggingConfigurable): @log_errors def handle_pong(self, msg): "a heart just beat" - current = asbytes(str(self.lifetime)) - last = asbytes(str(self.last_ping)) + current = str_to_bytes(str(self.lifetime)) + last = str_to_bytes(str(self.last_ping)) if msg[1] == current: delta = time.time()-self.tic # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 41ab511..259b0ea 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -28,6 +28,7 @@ from zmq.eventloop.zmqstream import ZMQStream # internal: from IPython.utils.importstring import import_item +from IPython.utils.py3compat import cast_bytes from IPython.utils.traitlets import ( HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName ) @@ -441,7 +442,7 @@ class Hub(SessionFactory): for t in targets: # map raw identities to ids if isinstance(t, (str,unicode)): - t = self.by_ident.get(t, t) + t = self.by_ident.get(cast_bytes(t), t) _targets.append(t) targets = _targets bad_targets = [ t for t in targets if t not in self.ids ] @@ -719,8 +720,8 @@ class Hub(SessionFactory): self.unassigned.remove(msg_id) header = msg['header'] - engine_uuid = header.get('engine', None) - eid = self.by_ident.get(engine_uuid, None) + engine_uuid = header.get('engine', u'') + eid = self.by_ident.get(cast_bytes(engine_uuid), None) status = header.get('status', None) @@ -763,7 +764,7 @@ class Hub(SessionFactory): # print (content) msg_id = content['msg_id'] engine_uuid = content['engine_id'] - eid = self.by_ident[util.asbytes(engine_uuid)] + eid = self.by_ident[cast_bytes(engine_uuid)] self.log.info("task::task %r arrived on %r", msg_id, eid) if msg_id in self.unassigned: @@ -853,13 +854,13 @@ class Hub(SessionFactory): """Register a new engine.""" content = msg['content'] try: - queue = util.asbytes(content['queue']) + queue = cast_bytes(content['queue']) except KeyError: self.log.error("registration::queue not specified", exc_info=True) return heart = content.get('heartbeat', None) if heart: - heart = util.asbytes(heart) + heart = cast_bytes(heart) """register a new engine, and create the socket(s) necessary""" eid = self._next_id # print (eid, queue, reg, heart) diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 25c4a2c..eea1043 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -42,10 +42,11 @@ from IPython.external.decorator import decorator from IPython.config.application import Application from IPython.config.loader import Config from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes +from IPython.utils.py3compat import cast_bytes from IPython.parallel import error, util from IPython.parallel.factory import SessionFactory -from IPython.parallel.util import connect_logger, local_logger, asbytes +from IPython.parallel.util import connect_logger, local_logger from .dependency import Dependency @@ -262,7 +263,7 @@ class TaskScheduler(SessionFactory): self.log.error("Unhandled message type: %r"%msg_type) else: try: - handler(asbytes(msg['content']['queue'])) + handler(cast_bytes(msg['content']['queue'])) except Exception: self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) @@ -316,7 +317,7 @@ class TaskScheduler(SessionFactory): # prevent double-handling of messages continue - raw_msg = lost[msg_id][0] + raw_msg = lost[msg_id].raw_msg idents,msg = self.session.feed_identities(raw_msg, copy=False) parent = self.session.unpack(msg[1].bytes) idents = [engine, idents[0]] @@ -370,7 +371,7 @@ class TaskScheduler(SessionFactory): # get targets as a set of bytes objects # from a list of unicode objects targets = header.get('targets', []) - targets = map(asbytes, targets) + targets = map(cast_bytes, targets) targets = set(targets) retries = header.get('retries', 0) diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index e9578cb..b5f5062 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -27,11 +27,11 @@ from IPython.external.ssh import tunnel from IPython.utils.traitlets import ( Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool ) -from IPython.utils import py3compat +from IPython.utils.py3compat import cast_bytes from IPython.parallel.controller.heartmonitor import Heart from IPython.parallel.factory import RegistrationFactory -from IPython.parallel.util import disambiguate_url, asbytes +from IPython.parallel.util import disambiguate_url from IPython.zmq.session import Message from IPython.zmq.ipkernel import Kernel @@ -69,7 +69,7 @@ class EngineFactory(RegistrationFactory): bident = CBytes() ident = Unicode() def _ident_changed(self, name, old, new): - self.bident = asbytes(new) + self.bident = cast_bytes(new) using_ssh=Bool(False) @@ -194,12 +194,12 @@ class EngineFactory(RegistrationFactory): # Redirect input streams and set a display hook. if self.out_stream_factory: sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout') - sys.stdout.topic = py3compat.cast_bytes('engine.%i.stdout' % self.id) + sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id) sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr') - sys.stderr.topic = py3compat.cast_bytes('engine.%i.stderr' % self.id) + sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id) if self.display_hook_factory: sys.displayhook = self.display_hook_factory(self.session, iopub_socket) - sys.displayhook.topic = py3compat.cast_bytes('engine.%i.pyout' % self.id) + sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id) self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket, diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py index 03c0cf9..5b3e5a6 100644 --- a/IPython/parallel/util.py +++ b/IPython/parallel/util.py @@ -124,12 +124,6 @@ def log_errors(f, self, *args, **kwargs): self.log.error("Uncaught exception in %r" % f, exc_info=True) -def asbytes(s): - """ensure that an object is ascii bytes""" - if isinstance(s, unicode): - s = s.encode('ascii') - return s - def is_url(url): """boolean check for whether a string is a zmq url""" if '://' not in url: