##// END OF EJS Templates
discard parallel.util.asbytes in favor of py3compat.cast_bytes
MinRK -
Show More
@@ -45,9 +45,10 b' from IPython.zmq.session import ('
45 45 from IPython.config.configurable import Configurable
46 46
47 47 from IPython.parallel.engine.engine import EngineFactory
48 from IPython.parallel.util import disambiguate_url, asbytes
48 from IPython.parallel.util import disambiguate_url
49 49
50 50 from IPython.utils.importstring import import_item
51 from IPython.utils.py3compat import cast_bytes
51 52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
52 53
53 54
@@ -203,7 +204,7 b' class IPEngineApp(BaseParallelApplication):'
203 204 d = json.loads(f.read())
204 205
205 206 if 'exec_key' in d:
206 config.Session.key = asbytes(d['exec_key'])
207 config.Session.key = cast_bytes(d['exec_key'])
207 208
208 209 try:
209 210 config.EngineFactory.location
@@ -36,6 +36,7 b' from IPython.core.application import BaseIPythonApplication'
36 36 from IPython.utils.jsonutil import rekey
37 37 from IPython.utils.localinterfaces import LOCAL_IPS
38 38 from IPython.utils.path import get_ipython_dir
39 from IPython.utils.py3compat import cast_bytes
39 40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
40 41 Dict, List, Bool, Set, Any)
41 42 from IPython.external.decorator import decorator
@@ -370,7 +371,7 b' class Client(HasTraits):'
370 371 if os.path.isfile(exec_key):
371 372 extra_args['keyfile'] = exec_key
372 373 else:
373 exec_key = util.asbytes(exec_key)
374 exec_key = cast_bytes(exec_key)
374 375 extra_args['key'] = exec_key
375 376 self.session = Session(**extra_args)
376 377
@@ -468,7 +469,7 b' class Client(HasTraits):'
468 469 if not isinstance(targets, (tuple, list, xrange)):
469 470 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
470 471
471 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
472 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
472 473
473 474 def _connect(self, sshserver, ssh_kwargs, timeout):
474 475 """setup all our socket connections to the cluster. This is called from
@@ -23,9 +23,10 b' from zmq.devices import ThreadDevice'
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.py3compat import str_to_bytes
26 27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27 28
28 from IPython.parallel.util import asbytes, log_errors
29 from IPython.parallel.util import log_errors
29 30
30 31 class Heart(object):
31 32 """A basic heart object for responding to a HeartMonitor.
@@ -123,7 +124,7 b' class HeartMonitor(LoggingConfigurable):'
123 124 self.responses = set()
124 125 # print self.on_probation, self.hearts
125 126 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 self.pingstream.send(asbytes(str(self.lifetime)))
127 self.pingstream.send(str_to_bytes(str(self.lifetime)))
127 128 # flush stream to force immediate socket send
128 129 self.pingstream.flush()
129 130
@@ -151,8 +152,8 b' class HeartMonitor(LoggingConfigurable):'
151 152 @log_errors
152 153 def handle_pong(self, msg):
153 154 "a heart just beat"
154 current = asbytes(str(self.lifetime))
155 last = asbytes(str(self.last_ping))
155 current = str_to_bytes(str(self.lifetime))
156 last = str_to_bytes(str(self.last_ping))
156 157 if msg[1] == current:
157 158 delta = time.time()-self.tic
158 159 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
@@ -28,6 +28,7 b' from zmq.eventloop.zmqstream import ZMQStream'
28 28
29 29 # internal:
30 30 from IPython.utils.importstring import import_item
31 from IPython.utils.py3compat import cast_bytes
31 32 from IPython.utils.traitlets import (
32 33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 34 )
@@ -441,7 +442,7 b' class Hub(SessionFactory):'
441 442 for t in targets:
442 443 # map raw identities to ids
443 444 if isinstance(t, (str,unicode)):
444 t = self.by_ident.get(t, t)
445 t = self.by_ident.get(cast_bytes(t), t)
445 446 _targets.append(t)
446 447 targets = _targets
447 448 bad_targets = [ t for t in targets if t not in self.ids ]
@@ -719,8 +720,8 b' class Hub(SessionFactory):'
719 720 self.unassigned.remove(msg_id)
720 721
721 722 header = msg['header']
722 engine_uuid = header.get('engine', None)
723 eid = self.by_ident.get(engine_uuid, None)
723 engine_uuid = header.get('engine', u'')
724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
724 725
725 726 status = header.get('status', None)
726 727
@@ -763,7 +764,7 b' class Hub(SessionFactory):'
763 764 # print (content)
764 765 msg_id = content['msg_id']
765 766 engine_uuid = content['engine_id']
766 eid = self.by_ident[util.asbytes(engine_uuid)]
767 eid = self.by_ident[cast_bytes(engine_uuid)]
767 768
768 769 self.log.info("task::task %r arrived on %r", msg_id, eid)
769 770 if msg_id in self.unassigned:
@@ -853,13 +854,13 b' class Hub(SessionFactory):'
853 854 """Register a new engine."""
854 855 content = msg['content']
855 856 try:
856 queue = util.asbytes(content['queue'])
857 queue = cast_bytes(content['queue'])
857 858 except KeyError:
858 859 self.log.error("registration::queue not specified", exc_info=True)
859 860 return
860 861 heart = content.get('heartbeat', None)
861 862 if heart:
862 heart = util.asbytes(heart)
863 heart = cast_bytes(heart)
863 864 """register a new engine, and create the socket(s) necessary"""
864 865 eid = self._next_id
865 866 # print (eid, queue, reg, heart)
@@ -42,10 +42,11 b' from IPython.external.decorator import decorator'
42 42 from IPython.config.application import Application
43 43 from IPython.config.loader import Config
44 44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
45 from IPython.utils.py3compat import cast_bytes
45 46
46 47 from IPython.parallel import error, util
47 48 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.util import connect_logger, local_logger, asbytes
49 from IPython.parallel.util import connect_logger, local_logger
49 50
50 51 from .dependency import Dependency
51 52
@@ -262,7 +263,7 b' class TaskScheduler(SessionFactory):'
262 263 self.log.error("Unhandled message type: %r"%msg_type)
263 264 else:
264 265 try:
265 handler(asbytes(msg['content']['queue']))
266 handler(cast_bytes(msg['content']['queue']))
266 267 except Exception:
267 268 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
268 269
@@ -316,7 +317,7 b' class TaskScheduler(SessionFactory):'
316 317 # prevent double-handling of messages
317 318 continue
318 319
319 raw_msg = lost[msg_id][0]
320 raw_msg = lost[msg_id].raw_msg
320 321 idents,msg = self.session.feed_identities(raw_msg, copy=False)
321 322 parent = self.session.unpack(msg[1].bytes)
322 323 idents = [engine, idents[0]]
@@ -370,7 +371,7 b' class TaskScheduler(SessionFactory):'
370 371 # get targets as a set of bytes objects
371 372 # from a list of unicode objects
372 373 targets = header.get('targets', [])
373 targets = map(asbytes, targets)
374 targets = map(cast_bytes, targets)
374 375 targets = set(targets)
375 376
376 377 retries = header.get('retries', 0)
@@ -27,11 +27,11 b' from IPython.external.ssh import tunnel'
27 27 from IPython.utils.traitlets import (
28 28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
29 29 )
30 from IPython.utils import py3compat
30 from IPython.utils.py3compat import cast_bytes
31 31
32 32 from IPython.parallel.controller.heartmonitor import Heart
33 33 from IPython.parallel.factory import RegistrationFactory
34 from IPython.parallel.util import disambiguate_url, asbytes
34 from IPython.parallel.util import disambiguate_url
35 35
36 36 from IPython.zmq.session import Message
37 37 from IPython.zmq.ipkernel import Kernel
@@ -69,7 +69,7 b' class EngineFactory(RegistrationFactory):'
69 69 bident = CBytes()
70 70 ident = Unicode()
71 71 def _ident_changed(self, name, old, new):
72 self.bident = asbytes(new)
72 self.bident = cast_bytes(new)
73 73 using_ssh=Bool(False)
74 74
75 75
@@ -194,12 +194,12 b' class EngineFactory(RegistrationFactory):'
194 194 # Redirect input streams and set a display hook.
195 195 if self.out_stream_factory:
196 196 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
197 sys.stdout.topic = py3compat.cast_bytes('engine.%i.stdout' % self.id)
197 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
198 198 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
199 sys.stderr.topic = py3compat.cast_bytes('engine.%i.stderr' % self.id)
199 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
200 200 if self.display_hook_factory:
201 201 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
202 sys.displayhook.topic = py3compat.cast_bytes('engine.%i.pyout' % self.id)
202 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
203 203
204 204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
205 205 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
@@ -124,12 +124,6 b' def log_errors(f, self, *args, **kwargs):'
124 124 self.log.error("Uncaught exception in %r" % f, exc_info=True)
125 125
126 126
127 def asbytes(s):
128 """ensure that an object is ascii bytes"""
129 if isinstance(s, unicode):
130 s = s.encode('ascii')
131 return s
132
133 127 def is_url(url):
134 128 """boolean check for whether a string is a zmq url"""
135 129 if '://' not in url:
General Comments 0
You need to be logged in to leave comments. Login now