##// END OF EJS Templates
discard parallel.util.asbytes in favor of py3compat.cast_bytes
MinRK -
Show More
@@ -45,9 +45,10 b' from IPython.zmq.session import ('
45 from IPython.config.configurable import Configurable
45 from IPython.config.configurable import Configurable
46
46
47 from IPython.parallel.engine.engine import EngineFactory
47 from IPython.parallel.engine.engine import EngineFactory
48 from IPython.parallel.util import disambiguate_url, asbytes
48 from IPython.parallel.util import disambiguate_url
49
49
50 from IPython.utils.importstring import import_item
50 from IPython.utils.importstring import import_item
51 from IPython.utils.py3compat import cast_bytes
51 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
52
53
53
54
@@ -203,7 +204,7 b' class IPEngineApp(BaseParallelApplication):'
203 d = json.loads(f.read())
204 d = json.loads(f.read())
204
205
205 if 'exec_key' in d:
206 if 'exec_key' in d:
206 config.Session.key = asbytes(d['exec_key'])
207 config.Session.key = cast_bytes(d['exec_key'])
207
208
208 try:
209 try:
209 config.EngineFactory.location
210 config.EngineFactory.location
@@ -36,6 +36,7 b' from IPython.core.application import BaseIPythonApplication'
36 from IPython.utils.jsonutil import rekey
36 from IPython.utils.jsonutil import rekey
37 from IPython.utils.localinterfaces import LOCAL_IPS
37 from IPython.utils.localinterfaces import LOCAL_IPS
38 from IPython.utils.path import get_ipython_dir
38 from IPython.utils.path import get_ipython_dir
39 from IPython.utils.py3compat import cast_bytes
39 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
40 Dict, List, Bool, Set, Any)
41 Dict, List, Bool, Set, Any)
41 from IPython.external.decorator import decorator
42 from IPython.external.decorator import decorator
@@ -370,7 +371,7 b' class Client(HasTraits):'
370 if os.path.isfile(exec_key):
371 if os.path.isfile(exec_key):
371 extra_args['keyfile'] = exec_key
372 extra_args['keyfile'] = exec_key
372 else:
373 else:
373 exec_key = util.asbytes(exec_key)
374 exec_key = cast_bytes(exec_key)
374 extra_args['key'] = exec_key
375 extra_args['key'] = exec_key
375 self.session = Session(**extra_args)
376 self.session = Session(**extra_args)
376
377
@@ -468,7 +469,7 b' class Client(HasTraits):'
468 if not isinstance(targets, (tuple, list, xrange)):
469 if not isinstance(targets, (tuple, list, xrange)):
469 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
470 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
470
471
471 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
472 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
472
473
473 def _connect(self, sshserver, ssh_kwargs, timeout):
474 def _connect(self, sshserver, ssh_kwargs, timeout):
474 """setup all our socket connections to the cluster. This is called from
475 """setup all our socket connections to the cluster. This is called from
@@ -23,9 +23,10 b' from zmq.devices import ThreadDevice'
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.py3compat import str_to_bytes
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27
28
28 from IPython.parallel.util import asbytes, log_errors
29 from IPython.parallel.util import log_errors
29
30
30 class Heart(object):
31 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
32 """A basic heart object for responding to a HeartMonitor.
@@ -123,7 +124,7 b' class HeartMonitor(LoggingConfigurable):'
123 self.responses = set()
124 self.responses = set()
124 # print self.on_probation, self.hearts
125 # print self.on_probation, self.hearts
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 self.pingstream.send(asbytes(str(self.lifetime)))
127 self.pingstream.send(str_to_bytes(str(self.lifetime)))
127 # flush stream to force immediate socket send
128 # flush stream to force immediate socket send
128 self.pingstream.flush()
129 self.pingstream.flush()
129
130
@@ -151,8 +152,8 b' class HeartMonitor(LoggingConfigurable):'
151 @log_errors
152 @log_errors
152 def handle_pong(self, msg):
153 def handle_pong(self, msg):
153 "a heart just beat"
154 "a heart just beat"
154 current = asbytes(str(self.lifetime))
155 current = str_to_bytes(str(self.lifetime))
155 last = asbytes(str(self.last_ping))
156 last = str_to_bytes(str(self.last_ping))
156 if msg[1] == current:
157 if msg[1] == current:
157 delta = time.time()-self.tic
158 delta = time.time()-self.tic
158 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
159 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
@@ -28,6 +28,7 b' from zmq.eventloop.zmqstream import ZMQStream'
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.py3compat import cast_bytes
31 from IPython.utils.traitlets import (
32 from IPython.utils.traitlets import (
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 )
34 )
@@ -441,7 +442,7 b' class Hub(SessionFactory):'
441 for t in targets:
442 for t in targets:
442 # map raw identities to ids
443 # map raw identities to ids
443 if isinstance(t, (str,unicode)):
444 if isinstance(t, (str,unicode)):
444 t = self.by_ident.get(t, t)
445 t = self.by_ident.get(cast_bytes(t), t)
445 _targets.append(t)
446 _targets.append(t)
446 targets = _targets
447 targets = _targets
447 bad_targets = [ t for t in targets if t not in self.ids ]
448 bad_targets = [ t for t in targets if t not in self.ids ]
@@ -719,8 +720,8 b' class Hub(SessionFactory):'
719 self.unassigned.remove(msg_id)
720 self.unassigned.remove(msg_id)
720
721
721 header = msg['header']
722 header = msg['header']
722 engine_uuid = header.get('engine', None)
723 engine_uuid = header.get('engine', u'')
723 eid = self.by_ident.get(engine_uuid, None)
724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
724
725
725 status = header.get('status', None)
726 status = header.get('status', None)
726
727
@@ -763,7 +764,7 b' class Hub(SessionFactory):'
763 # print (content)
764 # print (content)
764 msg_id = content['msg_id']
765 msg_id = content['msg_id']
765 engine_uuid = content['engine_id']
766 engine_uuid = content['engine_id']
766 eid = self.by_ident[util.asbytes(engine_uuid)]
767 eid = self.by_ident[cast_bytes(engine_uuid)]
767
768
768 self.log.info("task::task %r arrived on %r", msg_id, eid)
769 self.log.info("task::task %r arrived on %r", msg_id, eid)
769 if msg_id in self.unassigned:
770 if msg_id in self.unassigned:
@@ -853,13 +854,13 b' class Hub(SessionFactory):'
853 """Register a new engine."""
854 """Register a new engine."""
854 content = msg['content']
855 content = msg['content']
855 try:
856 try:
856 queue = util.asbytes(content['queue'])
857 queue = cast_bytes(content['queue'])
857 except KeyError:
858 except KeyError:
858 self.log.error("registration::queue not specified", exc_info=True)
859 self.log.error("registration::queue not specified", exc_info=True)
859 return
860 return
860 heart = content.get('heartbeat', None)
861 heart = content.get('heartbeat', None)
861 if heart:
862 if heart:
862 heart = util.asbytes(heart)
863 heart = cast_bytes(heart)
863 """register a new engine, and create the socket(s) necessary"""
864 """register a new engine, and create the socket(s) necessary"""
864 eid = self._next_id
865 eid = self._next_id
865 # print (eid, queue, reg, heart)
866 # print (eid, queue, reg, heart)
@@ -42,10 +42,11 b' from IPython.external.decorator import decorator'
42 from IPython.config.application import Application
42 from IPython.config.application import Application
43 from IPython.config.loader import Config
43 from IPython.config.loader import Config
44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
45 from IPython.utils.py3compat import cast_bytes
45
46
46 from IPython.parallel import error, util
47 from IPython.parallel import error, util
47 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.util import connect_logger, local_logger, asbytes
49 from IPython.parallel.util import connect_logger, local_logger
49
50
50 from .dependency import Dependency
51 from .dependency import Dependency
51
52
@@ -262,7 +263,7 b' class TaskScheduler(SessionFactory):'
262 self.log.error("Unhandled message type: %r"%msg_type)
263 self.log.error("Unhandled message type: %r"%msg_type)
263 else:
264 else:
264 try:
265 try:
265 handler(asbytes(msg['content']['queue']))
266 handler(cast_bytes(msg['content']['queue']))
266 except Exception:
267 except Exception:
267 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
268 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
268
269
@@ -316,7 +317,7 b' class TaskScheduler(SessionFactory):'
316 # prevent double-handling of messages
317 # prevent double-handling of messages
317 continue
318 continue
318
319
319 raw_msg = lost[msg_id][0]
320 raw_msg = lost[msg_id].raw_msg
320 idents,msg = self.session.feed_identities(raw_msg, copy=False)
321 idents,msg = self.session.feed_identities(raw_msg, copy=False)
321 parent = self.session.unpack(msg[1].bytes)
322 parent = self.session.unpack(msg[1].bytes)
322 idents = [engine, idents[0]]
323 idents = [engine, idents[0]]
@@ -370,7 +371,7 b' class TaskScheduler(SessionFactory):'
370 # get targets as a set of bytes objects
371 # get targets as a set of bytes objects
371 # from a list of unicode objects
372 # from a list of unicode objects
372 targets = header.get('targets', [])
373 targets = header.get('targets', [])
373 targets = map(asbytes, targets)
374 targets = map(cast_bytes, targets)
374 targets = set(targets)
375 targets = set(targets)
375
376
376 retries = header.get('retries', 0)
377 retries = header.get('retries', 0)
@@ -27,11 +27,11 b' from IPython.external.ssh import tunnel'
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
29 )
29 )
30 from IPython.utils import py3compat
30 from IPython.utils.py3compat import cast_bytes
31
31
32 from IPython.parallel.controller.heartmonitor import Heart
32 from IPython.parallel.controller.heartmonitor import Heart
33 from IPython.parallel.factory import RegistrationFactory
33 from IPython.parallel.factory import RegistrationFactory
34 from IPython.parallel.util import disambiguate_url, asbytes
34 from IPython.parallel.util import disambiguate_url
35
35
36 from IPython.zmq.session import Message
36 from IPython.zmq.session import Message
37 from IPython.zmq.ipkernel import Kernel
37 from IPython.zmq.ipkernel import Kernel
@@ -69,7 +69,7 b' class EngineFactory(RegistrationFactory):'
69 bident = CBytes()
69 bident = CBytes()
70 ident = Unicode()
70 ident = Unicode()
71 def _ident_changed(self, name, old, new):
71 def _ident_changed(self, name, old, new):
72 self.bident = asbytes(new)
72 self.bident = cast_bytes(new)
73 using_ssh=Bool(False)
73 using_ssh=Bool(False)
74
74
75
75
@@ -194,12 +194,12 b' class EngineFactory(RegistrationFactory):'
194 # Redirect input streams and set a display hook.
194 # Redirect input streams and set a display hook.
195 if self.out_stream_factory:
195 if self.out_stream_factory:
196 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
196 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
197 sys.stdout.topic = py3compat.cast_bytes('engine.%i.stdout' % self.id)
197 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
198 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
198 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
199 sys.stderr.topic = py3compat.cast_bytes('engine.%i.stderr' % self.id)
199 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
200 if self.display_hook_factory:
200 if self.display_hook_factory:
201 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
201 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
202 sys.displayhook.topic = py3compat.cast_bytes('engine.%i.pyout' % self.id)
202 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
203
203
204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
205 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
205 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
@@ -124,12 +124,6 b' def log_errors(f, self, *args, **kwargs):'
124 self.log.error("Uncaught exception in %r" % f, exc_info=True)
124 self.log.error("Uncaught exception in %r" % f, exc_info=True)
125
125
126
126
127 def asbytes(s):
128 """ensure that an object is ascii bytes"""
129 if isinstance(s, unicode):
130 s = s.encode('ascii')
131 return s
132
133 def is_url(url):
127 def is_url(url):
134 """boolean check for whether a string is a zmq url"""
128 """boolean check for whether a string is a zmq url"""
135 if '://' not in url:
129 if '://' not in url:
General Comments 0
You need to be logged in to leave comments. Login now