Show More
@@ -45,9 +45,10 b' from IPython.zmq.session import (' | |||||
45 | from IPython.config.configurable import Configurable |
|
45 | from IPython.config.configurable import Configurable | |
46 |
|
46 | |||
47 | from IPython.parallel.engine.engine import EngineFactory |
|
47 | from IPython.parallel.engine.engine import EngineFactory | |
48 |
from IPython.parallel.util import disambiguate_url |
|
48 | from IPython.parallel.util import disambiguate_url | |
49 |
|
49 | |||
50 | from IPython.utils.importstring import import_item |
|
50 | from IPython.utils.importstring import import_item | |
|
51 | from IPython.utils.py3compat import cast_bytes | |||
51 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float |
|
52 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float | |
52 |
|
53 | |||
53 |
|
54 | |||
@@ -203,7 +204,7 b' class IPEngineApp(BaseParallelApplication):' | |||||
203 | d = json.loads(f.read()) |
|
204 | d = json.loads(f.read()) | |
204 |
|
205 | |||
205 | if 'exec_key' in d: |
|
206 | if 'exec_key' in d: | |
206 | config.Session.key = asbytes(d['exec_key']) |
|
207 | config.Session.key = cast_bytes(d['exec_key']) | |
207 |
|
208 | |||
208 | try: |
|
209 | try: | |
209 | config.EngineFactory.location |
|
210 | config.EngineFactory.location |
@@ -36,6 +36,7 b' from IPython.core.application import BaseIPythonApplication' | |||||
36 | from IPython.utils.jsonutil import rekey |
|
36 | from IPython.utils.jsonutil import rekey | |
37 | from IPython.utils.localinterfaces import LOCAL_IPS |
|
37 | from IPython.utils.localinterfaces import LOCAL_IPS | |
38 | from IPython.utils.path import get_ipython_dir |
|
38 | from IPython.utils.path import get_ipython_dir | |
|
39 | from IPython.utils.py3compat import cast_bytes | |||
39 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, |
|
40 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, | |
40 | Dict, List, Bool, Set, Any) |
|
41 | Dict, List, Bool, Set, Any) | |
41 | from IPython.external.decorator import decorator |
|
42 | from IPython.external.decorator import decorator | |
@@ -370,7 +371,7 b' class Client(HasTraits):' | |||||
370 | if os.path.isfile(exec_key): |
|
371 | if os.path.isfile(exec_key): | |
371 | extra_args['keyfile'] = exec_key |
|
372 | extra_args['keyfile'] = exec_key | |
372 | else: |
|
373 | else: | |
373 |
exec_key = |
|
374 | exec_key = cast_bytes(exec_key) | |
374 | extra_args['key'] = exec_key |
|
375 | extra_args['key'] = exec_key | |
375 | self.session = Session(**extra_args) |
|
376 | self.session = Session(**extra_args) | |
376 |
|
377 | |||
@@ -468,7 +469,7 b' class Client(HasTraits):' | |||||
468 | if not isinstance(targets, (tuple, list, xrange)): |
|
469 | if not isinstance(targets, (tuple, list, xrange)): | |
469 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) |
|
470 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) | |
470 |
|
471 | |||
471 |
return [ |
|
472 | return [cast_bytes(self._engines[t]) for t in targets], list(targets) | |
472 |
|
473 | |||
473 | def _connect(self, sshserver, ssh_kwargs, timeout): |
|
474 | def _connect(self, sshserver, ssh_kwargs, timeout): | |
474 | """setup all our socket connections to the cluster. This is called from |
|
475 | """setup all our socket connections to the cluster. This is called from |
@@ -23,9 +23,10 b' from zmq.devices import ThreadDevice' | |||||
23 | from zmq.eventloop import ioloop, zmqstream |
|
23 | from zmq.eventloop import ioloop, zmqstream | |
24 |
|
24 | |||
25 | from IPython.config.configurable import LoggingConfigurable |
|
25 | from IPython.config.configurable import LoggingConfigurable | |
|
26 | from IPython.utils.py3compat import str_to_bytes | |||
26 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer |
|
27 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer | |
27 |
|
28 | |||
28 |
from IPython.parallel.util import |
|
29 | from IPython.parallel.util import log_errors | |
29 |
|
30 | |||
30 | class Heart(object): |
|
31 | class Heart(object): | |
31 | """A basic heart object for responding to a HeartMonitor. |
|
32 | """A basic heart object for responding to a HeartMonitor. | |
@@ -123,7 +124,7 b' class HeartMonitor(LoggingConfigurable):' | |||||
123 | self.responses = set() |
|
124 | self.responses = set() | |
124 | # print self.on_probation, self.hearts |
|
125 | # print self.on_probation, self.hearts | |
125 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) |
|
126 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) | |
126 |
self.pingstream.send( |
|
127 | self.pingstream.send(str_to_bytes(str(self.lifetime))) | |
127 | # flush stream to force immediate socket send |
|
128 | # flush stream to force immediate socket send | |
128 | self.pingstream.flush() |
|
129 | self.pingstream.flush() | |
129 |
|
130 | |||
@@ -151,8 +152,8 b' class HeartMonitor(LoggingConfigurable):' | |||||
151 | @log_errors |
|
152 | @log_errors | |
152 | def handle_pong(self, msg): |
|
153 | def handle_pong(self, msg): | |
153 | "a heart just beat" |
|
154 | "a heart just beat" | |
154 |
current = |
|
155 | current = str_to_bytes(str(self.lifetime)) | |
155 |
last = |
|
156 | last = str_to_bytes(str(self.last_ping)) | |
156 | if msg[1] == current: |
|
157 | if msg[1] == current: | |
157 | delta = time.time()-self.tic |
|
158 | delta = time.time()-self.tic | |
158 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
|
159 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
@@ -28,6 +28,7 b' from zmq.eventloop.zmqstream import ZMQStream' | |||||
28 |
|
28 | |||
29 | # internal: |
|
29 | # internal: | |
30 | from IPython.utils.importstring import import_item |
|
30 | from IPython.utils.importstring import import_item | |
|
31 | from IPython.utils.py3compat import cast_bytes | |||
31 | from IPython.utils.traitlets import ( |
|
32 | from IPython.utils.traitlets import ( | |
32 | HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName |
|
33 | HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName | |
33 | ) |
|
34 | ) | |
@@ -441,7 +442,7 b' class Hub(SessionFactory):' | |||||
441 | for t in targets: |
|
442 | for t in targets: | |
442 | # map raw identities to ids |
|
443 | # map raw identities to ids | |
443 | if isinstance(t, (str,unicode)): |
|
444 | if isinstance(t, (str,unicode)): | |
444 | t = self.by_ident.get(t, t) |
|
445 | t = self.by_ident.get(cast_bytes(t), t) | |
445 | _targets.append(t) |
|
446 | _targets.append(t) | |
446 | targets = _targets |
|
447 | targets = _targets | |
447 | bad_targets = [ t for t in targets if t not in self.ids ] |
|
448 | bad_targets = [ t for t in targets if t not in self.ids ] | |
@@ -719,8 +720,8 b' class Hub(SessionFactory):' | |||||
719 | self.unassigned.remove(msg_id) |
|
720 | self.unassigned.remove(msg_id) | |
720 |
|
721 | |||
721 | header = msg['header'] |
|
722 | header = msg['header'] | |
722 |
engine_uuid = header.get('engine', |
|
723 | engine_uuid = header.get('engine', u'') | |
723 | eid = self.by_ident.get(engine_uuid, None) |
|
724 | eid = self.by_ident.get(cast_bytes(engine_uuid), None) | |
724 |
|
725 | |||
725 | status = header.get('status', None) |
|
726 | status = header.get('status', None) | |
726 |
|
727 | |||
@@ -763,7 +764,7 b' class Hub(SessionFactory):' | |||||
763 | # print (content) |
|
764 | # print (content) | |
764 | msg_id = content['msg_id'] |
|
765 | msg_id = content['msg_id'] | |
765 | engine_uuid = content['engine_id'] |
|
766 | engine_uuid = content['engine_id'] | |
766 |
eid = self.by_ident[ |
|
767 | eid = self.by_ident[cast_bytes(engine_uuid)] | |
767 |
|
768 | |||
768 | self.log.info("task::task %r arrived on %r", msg_id, eid) |
|
769 | self.log.info("task::task %r arrived on %r", msg_id, eid) | |
769 | if msg_id in self.unassigned: |
|
770 | if msg_id in self.unassigned: | |
@@ -853,13 +854,13 b' class Hub(SessionFactory):' | |||||
853 | """Register a new engine.""" |
|
854 | """Register a new engine.""" | |
854 | content = msg['content'] |
|
855 | content = msg['content'] | |
855 | try: |
|
856 | try: | |
856 |
queue = |
|
857 | queue = cast_bytes(content['queue']) | |
857 | except KeyError: |
|
858 | except KeyError: | |
858 | self.log.error("registration::queue not specified", exc_info=True) |
|
859 | self.log.error("registration::queue not specified", exc_info=True) | |
859 | return |
|
860 | return | |
860 | heart = content.get('heartbeat', None) |
|
861 | heart = content.get('heartbeat', None) | |
861 | if heart: |
|
862 | if heart: | |
862 |
heart = |
|
863 | heart = cast_bytes(heart) | |
863 | """register a new engine, and create the socket(s) necessary""" |
|
864 | """register a new engine, and create the socket(s) necessary""" | |
864 | eid = self._next_id |
|
865 | eid = self._next_id | |
865 | # print (eid, queue, reg, heart) |
|
866 | # print (eid, queue, reg, heart) |
@@ -42,10 +42,11 b' from IPython.external.decorator import decorator' | |||||
42 | from IPython.config.application import Application |
|
42 | from IPython.config.application import Application | |
43 | from IPython.config.loader import Config |
|
43 | from IPython.config.loader import Config | |
44 | from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes |
|
44 | from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes | |
|
45 | from IPython.utils.py3compat import cast_bytes | |||
45 |
|
46 | |||
46 | from IPython.parallel import error, util |
|
47 | from IPython.parallel import error, util | |
47 | from IPython.parallel.factory import SessionFactory |
|
48 | from IPython.parallel.factory import SessionFactory | |
48 |
from IPython.parallel.util import connect_logger, local_logger |
|
49 | from IPython.parallel.util import connect_logger, local_logger | |
49 |
|
50 | |||
50 | from .dependency import Dependency |
|
51 | from .dependency import Dependency | |
51 |
|
52 | |||
@@ -262,7 +263,7 b' class TaskScheduler(SessionFactory):' | |||||
262 | self.log.error("Unhandled message type: %r"%msg_type) |
|
263 | self.log.error("Unhandled message type: %r"%msg_type) | |
263 | else: |
|
264 | else: | |
264 | try: |
|
265 | try: | |
265 | handler(asbytes(msg['content']['queue'])) |
|
266 | handler(cast_bytes(msg['content']['queue'])) | |
266 | except Exception: |
|
267 | except Exception: | |
267 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) |
|
268 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) | |
268 |
|
269 | |||
@@ -316,7 +317,7 b' class TaskScheduler(SessionFactory):' | |||||
316 | # prevent double-handling of messages |
|
317 | # prevent double-handling of messages | |
317 | continue |
|
318 | continue | |
318 |
|
319 | |||
319 |
raw_msg = lost[msg_id] |
|
320 | raw_msg = lost[msg_id].raw_msg | |
320 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
321 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
321 | parent = self.session.unpack(msg[1].bytes) |
|
322 | parent = self.session.unpack(msg[1].bytes) | |
322 | idents = [engine, idents[0]] |
|
323 | idents = [engine, idents[0]] | |
@@ -370,7 +371,7 b' class TaskScheduler(SessionFactory):' | |||||
370 | # get targets as a set of bytes objects |
|
371 | # get targets as a set of bytes objects | |
371 | # from a list of unicode objects |
|
372 | # from a list of unicode objects | |
372 | targets = header.get('targets', []) |
|
373 | targets = header.get('targets', []) | |
373 | targets = map(asbytes, targets) |
|
374 | targets = map(cast_bytes, targets) | |
374 | targets = set(targets) |
|
375 | targets = set(targets) | |
375 |
|
376 | |||
376 | retries = header.get('retries', 0) |
|
377 | retries = header.get('retries', 0) |
@@ -27,11 +27,11 b' from IPython.external.ssh import tunnel' | |||||
27 | from IPython.utils.traitlets import ( |
|
27 | from IPython.utils.traitlets import ( | |
28 | Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool |
|
28 | Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool | |
29 | ) |
|
29 | ) | |
30 |
from IPython.utils import |
|
30 | from IPython.utils.py3compat import cast_bytes | |
31 |
|
31 | |||
32 | from IPython.parallel.controller.heartmonitor import Heart |
|
32 | from IPython.parallel.controller.heartmonitor import Heart | |
33 | from IPython.parallel.factory import RegistrationFactory |
|
33 | from IPython.parallel.factory import RegistrationFactory | |
34 |
from IPython.parallel.util import disambiguate_url |
|
34 | from IPython.parallel.util import disambiguate_url | |
35 |
|
35 | |||
36 | from IPython.zmq.session import Message |
|
36 | from IPython.zmq.session import Message | |
37 | from IPython.zmq.ipkernel import Kernel |
|
37 | from IPython.zmq.ipkernel import Kernel | |
@@ -69,7 +69,7 b' class EngineFactory(RegistrationFactory):' | |||||
69 | bident = CBytes() |
|
69 | bident = CBytes() | |
70 | ident = Unicode() |
|
70 | ident = Unicode() | |
71 | def _ident_changed(self, name, old, new): |
|
71 | def _ident_changed(self, name, old, new): | |
72 | self.bident = asbytes(new) |
|
72 | self.bident = cast_bytes(new) | |
73 | using_ssh=Bool(False) |
|
73 | using_ssh=Bool(False) | |
74 |
|
74 | |||
75 |
|
75 | |||
@@ -194,12 +194,12 b' class EngineFactory(RegistrationFactory):' | |||||
194 | # Redirect input streams and set a display hook. |
|
194 | # Redirect input streams and set a display hook. | |
195 | if self.out_stream_factory: |
|
195 | if self.out_stream_factory: | |
196 | sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout') |
|
196 | sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout') | |
197 |
sys.stdout.topic = |
|
197 | sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id) | |
198 | sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr') |
|
198 | sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr') | |
199 |
sys.stderr.topic = |
|
199 | sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id) | |
200 | if self.display_hook_factory: |
|
200 | if self.display_hook_factory: | |
201 | sys.displayhook = self.display_hook_factory(self.session, iopub_socket) |
|
201 | sys.displayhook = self.display_hook_factory(self.session, iopub_socket) | |
202 |
sys.displayhook.topic = |
|
202 | sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id) | |
203 |
|
203 | |||
204 | self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, |
|
204 | self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, | |
205 | control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket, |
|
205 | control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket, |
@@ -124,12 +124,6 b' def log_errors(f, self, *args, **kwargs):' | |||||
124 | self.log.error("Uncaught exception in %r" % f, exc_info=True) |
|
124 | self.log.error("Uncaught exception in %r" % f, exc_info=True) | |
125 |
|
125 | |||
126 |
|
126 | |||
127 | def asbytes(s): |
|
|||
128 | """ensure that an object is ascii bytes""" |
|
|||
129 | if isinstance(s, unicode): |
|
|||
130 | s = s.encode('ascii') |
|
|||
131 | return s |
|
|||
132 |
|
||||
133 | def is_url(url): |
|
127 | def is_url(url): | |
134 | """boolean check for whether a string is a zmq url""" |
|
128 | """boolean check for whether a string is a zmq url""" | |
135 | if '://' not in url: |
|
129 | if '://' not in url: |
General Comments 0
You need to be logged in to leave comments.
Login now