diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 9ac1308..d62ebd0 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -54,7 +54,7 @@ from IPython.parallel.controller.hub import HubFactory from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler from IPython.parallel.controller.sqlitedb import SQLiteDB -from IPython.parallel.util import signal_children, split_url +from IPython.parallel.util import signal_children, split_url, ensure_bytes # conditional import of MongoDB backend class @@ -202,14 +202,13 @@ class IPControllerApp(BaseParallelApplication): # load from engine config with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f: cfg = json.loads(f.read()) - key = c.Session.key = cfg['exec_key'] + key = c.Session.key = ensure_bytes(cfg['exec_key']) xport,addr = cfg['url'].split('://') c.HubFactory.engine_transport = xport ip,ports = addr.split(':') c.HubFactory.engine_ip = ip c.HubFactory.regport = int(ports) self.location = cfg['location'] - # load client config with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f: cfg = json.loads(f.read()) @@ -240,9 +239,9 @@ class IPControllerApp(BaseParallelApplication): # with open(keyfile, 'w') as f: # f.write(key) # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) - c.Session.key = key + c.Session.key = ensure_bytes(key) else: - key = c.Session.key = '' + key = c.Session.key = b'' try: self.factory = HubFactory(config=c, log=self.log) @@ -273,27 +272,27 @@ class IPControllerApp(BaseParallelApplication): hub = self.factory # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url # IOPub relay (in a Process) - q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') + q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') q.bind_in(hub.client_info['iopub']) q.bind_out(hub.engine_info['iopub']) - q.setsockopt_out(zmq.SUBSCRIBE, '') + q.setsockopt_out(zmq.SUBSCRIBE, b'') q.connect_mon(hub.monitor_url) q.daemon=True children.append(q) # Multiplexer Queue (in a Process) - q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') + q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out') q.bind_in(hub.client_info['mux']) - q.setsockopt_in(zmq.IDENTITY, 'mux') + q.setsockopt_in(zmq.IDENTITY, b'mux') q.bind_out(hub.engine_info['mux']) q.connect_mon(hub.monitor_url) q.daemon=True children.append(q) # Control Queue (in a Process) - q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') + q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol') q.bind_in(hub.client_info['control']) - q.setsockopt_in(zmq.IDENTITY, 'control') + q.setsockopt_in(zmq.IDENTITY, b'control') q.bind_out(hub.engine_info['control']) q.connect_mon(hub.monitor_url) q.daemon=True @@ -305,10 +304,10 @@ class IPControllerApp(BaseParallelApplication): # Task Queue (in a Process) if scheme == 'pure': self.log.warn("task::using pure XREQ Task scheduler") - q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') + q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask') # q.setsockopt_out(zmq.HWM, hub.hwm) q.bind_in(hub.client_info['task'][1]) - q.setsockopt_in(zmq.IDENTITY, 'task') + q.setsockopt_in(zmq.IDENTITY, b'task') q.bind_out(hub.engine_info['task']) q.connect_mon(hub.monitor_url) q.daemon=True diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index 9e6ba31..b84be79 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -41,7 +41,7 @@ from IPython.config.configurable import Configurable from IPython.zmq.session import Session from IPython.parallel.engine.engine import EngineFactory from IPython.parallel.engine.streamkernel import Kernel -from IPython.parallel.util import disambiguate_url +from IPython.parallel.util import disambiguate_url, ensure_bytes from IPython.utils.importstring import import_item from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float @@ -216,11 +216,8 @@ class IPEngineApp(BaseParallelApplication): self.log.info("Loading url_file %r"%self.url_file) with open(self.url_file) as f: d = json.loads(f.read()) - for k,v in d.iteritems(): - if isinstance(v, unicode): - d[k] = v.encode() if d['exec_key']: - config.Session.key = d['exec_key'] + config.Session.key = ensure_bytes(d['exec_key']) d['url'] = disambiguate_url(d['url'], d['location']) config.EngineFactory.url = d['url'] config.EngineFactory.location = d['location'] diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 8a8c568..958d08e 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -17,6 +17,7 @@ Authors: import os import json +import sys import time import warnings from datetime import datetime @@ -48,6 +49,11 @@ from .asyncresult import AsyncResult, AsyncHubResult from IPython.core.profiledir import ProfileDir, ProfileDirError from .view import DirectView, LoadBalancedView +if sys.version_info[0] >= 3: + # xrange is used in a coupe 'isinstance' tests in py2 + # should be just 'range' in 3k + xrange = range + #-------------------------------------------------------------------------- # Decorators for Client methods #-------------------------------------------------------------------------- @@ -356,13 +362,12 @@ class Client(HasTraits): if os.path.isfile(exec_key): extra_args['keyfile'] = exec_key else: - if isinstance(exec_key, unicode): - exec_key = exec_key.encode('ascii') + exec_key = util.ensure_bytes(exec_key) extra_args['key'] = exec_key self.session = Session(**extra_args) self._query_socket = self._context.socket(zmq.XREQ) - self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) + self._query_socket.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session)) if self._ssh: tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) else: @@ -404,7 +409,7 @@ class Client(HasTraits): """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" for k,v in engines.iteritems(): eid = int(k) - self._engines[eid] = bytes(v) # force not unicode + self._engines[eid] = v self._ids.append(eid) self._ids = sorted(self._ids) if sorted(self._engines.keys()) != range(len(self._engines)) and \ @@ -455,7 +460,7 @@ class Client(HasTraits): if not isinstance(targets, (tuple, list, xrange)): raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) - return [self._engines[t] for t in targets], list(targets) + return [util.ensure_bytes(self._engines[t]) for t in targets], list(targets) def _connect(self, sshserver, ssh_kwargs, timeout): """setup all our socket connections to the cluster. This is called from @@ -488,14 +493,15 @@ class Client(HasTraits): content = msg.content self._config['registration'] = dict(content) if content.status == 'ok': + ident = util.ensure_bytes(self.session.session) if content.mux: self._mux_socket = self._context.socket(zmq.XREQ) - self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) + self._mux_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._mux_socket, content.mux) if content.task: self._task_scheme, task_addr = content.task self._task_socket = self._context.socket(zmq.XREQ) - self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) + self._task_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._task_socket, task_addr) if content.notification: self._notification_socket = self._context.socket(zmq.SUB) @@ -507,12 +513,12 @@ class Client(HasTraits): # connect_socket(self._query_socket, content.query) if content.control: self._control_socket = self._context.socket(zmq.XREQ) - self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) + self._control_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._control_socket, content.control) if content.iopub: self._iopub_socket = self._context.socket(zmq.SUB) self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') - self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) + self._iopub_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._iopub_socket, content.iopub) self._update_engines(dict(content.engines)) else: diff --git a/IPython/parallel/client/map.py b/IPython/parallel/client/map.py index 5ea8d96..1829d8c 100644 --- a/IPython/parallel/client/map.py +++ b/IPython/parallel/client/map.py @@ -13,8 +13,6 @@ Authors: """ -__docformat__ = "restructuredtext en" - #------------------------------------------------------------------------------- # Copyright (C) 2008-2011 The IPython Development Team # @@ -26,6 +24,8 @@ __docformat__ = "restructuredtext en" # Imports #------------------------------------------------------------------------------- +from __future__ import division + import types from IPython.utils.data import flatten as utils_flatten @@ -67,7 +67,7 @@ class Map: return remainder = len(seq)%q - basesize = len(seq)/q + basesize = len(seq)//q hi = [] lo = [] for n in range(q): diff --git a/IPython/parallel/client/remotefunction.py b/IPython/parallel/client/remotefunction.py index cdaccdf..f3897bf 100644 --- a/IPython/parallel/client/remotefunction.py +++ b/IPython/parallel/client/remotefunction.py @@ -16,6 +16,8 @@ Authors: # Imports #----------------------------------------------------------------------------- +from __future__ import division + import warnings from IPython.testing.skipdoctest import skip_doctest @@ -142,7 +144,7 @@ class ParallelFunction(RemoteFunction): balanced = 'Balanced' in self.view.__class__.__name__ if balanced: if self.chunksize: - nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0) + nparts = len_0//self.chunksize + int(len_0%self.chunksize > 0) else: nparts = len_0 targets = [None]*nparts @@ -169,7 +171,10 @@ class ParallelFunction(RemoteFunction): # print (args) if hasattr(self, '_map'): - f = map + if sys.version_info[0] >= 3: + f = lambda f, *sequences: list(map(f, *sequences)) + else: + f = map args = [self.func]+args else: f=self.func diff --git a/IPython/parallel/client/view.py b/IPython/parallel/client/view.py index be35241..c604bbb 100644 --- a/IPython/parallel/client/view.py +++ b/IPython/parallel/client/view.py @@ -969,6 +969,8 @@ class LoadBalancedView(View): idents = [] else: idents = self.client._build_targets(targets)[0] + # ensure *not* bytes + idents = [ ident.decode() for ident in idents ] after = self._render_dependency(after) follow = self._render_dependency(follow) diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index 5e231a3..47658c8 100644 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -25,6 +25,8 @@ from zmq.eventloop import ioloop, zmqstream from IPython.config.configurable import LoggingConfigurable from IPython.utils.traitlets import Set, Instance, CFloat +from IPython.parallel.util import ensure_bytes + class Heart(object): """A basic heart object for responding to a HeartMonitor. This is a simple wrapper with defaults for the most common @@ -42,9 +44,9 @@ class Heart(object): self.device.connect_in(in_addr) self.device.connect_out(out_addr) if in_type == zmq.SUB: - self.device.setsockopt_in(zmq.SUBSCRIBE, "") + self.device.setsockopt_in(zmq.SUBSCRIBE, b"") if heart_id is None: - heart_id = str(uuid.uuid4()) + heart_id = ensure_bytes(uuid.uuid4()) self.device.setsockopt_out(zmq.IDENTITY, heart_id) self.id = heart_id @@ -115,7 +117,7 @@ class HeartMonitor(LoggingConfigurable): self.responses = set() # print self.on_probation, self.hearts # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) - self.pingstream.send(str(self.lifetime)) + self.pingstream.send(ensure_bytes(str(self.lifetime))) def handle_new_heart(self, heart): if self._new_handlers: @@ -140,11 +142,13 @@ class HeartMonitor(LoggingConfigurable): def handle_pong(self, msg): "a heart just beat" - if msg[1] == str(self.lifetime): + current = ensure_bytes(str(self.lifetime)) + last = ensure_bytes(str(self.last_ping)) + if msg[1] == current: delta = time.time()-self.tic # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) self.responses.add(msg[0]) - elif msg[1] == str(self.last_ping): + elif msg[1] == last: delta = time.time()-self.tic + (self.lifetime-self.last_ping) self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) self.responses.add(msg[0]) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 7ee5977..f601ce5 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -289,7 +289,7 @@ class HubFactory(RegistrationFactory): # resubmit stream r = ZMQStream(ctx.socket(zmq.XREQ), loop) url = util.disambiguate_url(self.client_info['task'][-1]) - r.setsockopt(zmq.IDENTITY, self.session.session) + r.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session)) r.connect(url) self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, @@ -380,14 +380,14 @@ class Hub(SessionFactory): self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) self.heartmonitor.add_new_heart_handler(self.handle_new_heart) - self.monitor_handlers = { 'in' : self.save_queue_request, - 'out': self.save_queue_result, - 'intask': self.save_task_request, - 'outtask': self.save_task_result, - 'tracktask': self.save_task_destination, - 'incontrol': _passer, - 'outcontrol': _passer, - 'iopub': self.save_iopub_message, + self.monitor_handlers = {b'in' : self.save_queue_request, + b'out': self.save_queue_result, + b'intask': self.save_task_request, + b'outtask': self.save_task_result, + b'tracktask': self.save_task_destination, + b'incontrol': _passer, + b'outcontrol': _passer, + b'iopub': self.save_iopub_message, } self.query_handlers = {'queue_request': self.queue_status, @@ -562,8 +562,9 @@ class Hub(SessionFactory): return record = init_record(msg) msg_id = record['msg_id'] - record['engine_uuid'] = queue_id - record['client_uuid'] = client_id + # Unicode in records + record['engine_uuid'] = queue_id.decode('utf8', 'replace') + record['client_uuid'] = client_id.decode('utf8', 'replace') record['queue'] = 'mux' try: @@ -751,7 +752,7 @@ class Hub(SessionFactory): # print (content) msg_id = content['msg_id'] engine_uuid = content['engine_id'] - eid = self.by_ident[engine_uuid] + eid = self.by_ident[util.ensure_bytes(engine_uuid)] self.log.info("task::task %r arrived on %r"%(msg_id, eid)) if msg_id in self.unassigned: @@ -833,7 +834,7 @@ class Hub(SessionFactory): jsonable = {} for k,v in self.keytable.iteritems(): if v not in self.dead_engines: - jsonable[str(k)] = v + jsonable[str(k)] = v.decode() content['engines'] = jsonable self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) @@ -841,11 +842,13 @@ class Hub(SessionFactory): """Register a new engine.""" content = msg['content'] try: - queue = content['queue'] + queue = util.ensure_bytes(content['queue']) except KeyError: self.log.error("registration::queue not specified", exc_info=True) return heart = content.get('heartbeat', None) + if heart: + heart = util.ensure_bytes(heart) """register a new engine, and create the socket(s) necessary""" eid = self._next_id # print (eid, queue, reg, heart) @@ -912,7 +915,7 @@ class Hub(SessionFactory): self.log.info("registration::unregister_engine(%r)"%eid) # print (eid) uuid = self.keytable[eid] - content=dict(id=eid, queue=uuid) + content=dict(id=eid, queue=uuid.decode()) self.dead_engines.add(uuid) # self.ids.remove(eid) # uuid = self.keytable.pop(eid) @@ -980,7 +983,7 @@ class Hub(SessionFactory): self.tasks[eid] = list() self.completed[eid] = list() self.hearts[heart] = eid - content = dict(id=eid, queue=self.engines[eid].queue) + content = dict(id=eid, queue=self.engines[eid].queue.decode()) if self.notifier: self.session.send(self.notifier, "registration_notification", content=content) self.log.info("engine::Engine Connected: %i"%eid) @@ -1054,9 +1057,9 @@ class Hub(SessionFactory): queue = len(queue) completed = len(completed) tasks = len(tasks) - content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} + content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned) - + # print (content) self.session.send(self.query, "queue_reply", content=content, ident=client_id) def purge_results(self, client_id, msg): @@ -1179,7 +1182,7 @@ class Hub(SessionFactory): 'io' : io_dict, } if rec['result_buffers']: - buffers = map(str, rec['result_buffers']) + buffers = map(bytes, rec['result_buffers']) else: buffers = [] @@ -1281,7 +1284,7 @@ class Hub(SessionFactory): buffers.extend(rb) content = dict(status='ok', records=records, buffer_lens=buffer_lens, result_buffer_lens=result_buffer_lens) - + # self.log.debug (content) self.session.send(self.query, "db_reply", content=content, parent=msg, ident=client_id, buffers=buffers) diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index af76c86..55ee059 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -40,11 +40,11 @@ from zmq.eventloop import ioloop, zmqstream from IPython.external.decorator import decorator from IPython.config.application import Application from IPython.config.loader import Config -from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum +from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes from IPython.parallel import error from IPython.parallel.factory import SessionFactory -from IPython.parallel.util import connect_logger, local_logger +from IPython.parallel.util import connect_logger, local_logger, ensure_bytes from .dependency import Dependency @@ -174,6 +174,10 @@ class TaskScheduler(SessionFactory): blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') + ident = CBytes() # ZMQ identity. This should just be self.session.session + # but ensure Bytes + def _ident_default(self): + return ensure_bytes(self.session.session) def start(self): self.engine_stream.on_recv(self.dispatch_result, copy=False) @@ -204,7 +208,7 @@ class TaskScheduler(SessionFactory): try: idents,msg = self.session.feed_identities(msg) except ValueError: - self.log.warn("task::Invalid Message: %r"%msg) + self.log.warn("task::Invalid Message: %r",msg) return try: msg = self.session.unpack_message(msg) @@ -219,15 +223,16 @@ class TaskScheduler(SessionFactory): self.log.error("Unhandled message type: %r"%msg_type) else: try: - handler(str(msg['content']['queue'])) - except KeyError: - self.log.error("task::Invalid notification msg: %r"%msg) + handler(ensure_bytes(msg['content']['queue'])) + except Exception: + self.log.error("task::Invalid notification msg: %r",msg) def _register_engine(self, uid): """New engine with ident `uid` became available.""" # head of the line: self.targets.insert(0,uid) self.loads.insert(0,0) + # initialize sets self.completed[uid] = set() self.failed[uid] = set() @@ -309,14 +314,18 @@ class TaskScheduler(SessionFactory): # send to monitor - self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) + self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False) header = msg['header'] msg_id = header['msg_id'] self.all_ids.add(msg_id) - # targets - targets = set(header.get('targets', [])) + # get targets as a set of bytes objects + # from a list of unicode objects + targets = header.get('targets', []) + targets = map(ensure_bytes, targets) + targets = set(targets) + retries = header.get('retries', 0) self.retries[msg_id] = retries @@ -412,7 +421,7 @@ class TaskScheduler(SessionFactory): msg = self.session.send(self.client_stream, 'apply_reply', content, parent=header, ident=idents) - self.session.send(self.mon_stream, msg, ident=['outtask']+idents) + self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents) self.update_graph(msg_id, success=False) @@ -494,9 +503,9 @@ class TaskScheduler(SessionFactory): self.add_job(idx) self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) # notify Hub - content = dict(msg_id=msg_id, engine_id=target) + content = dict(msg_id=msg_id, engine_id=target.decode()) self.session.send(self.mon_stream, 'task_destination', content=content, - ident=['tracktask',self.session.session]) + ident=[b'tracktask',self.ident]) #----------------------------------------------------------------------- @@ -533,7 +542,7 @@ class TaskScheduler(SessionFactory): # relay to client and update graph self.handle_result(idents, parent, raw_msg, success) # send to Hub monitor - self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False) + self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False) else: self.handle_unmet_dependency(idents, parent) diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py index d7390a6..167cfd7 100644 --- a/IPython/parallel/controller/sqlitedb.py +++ b/IPython/parallel/controller/sqlitedb.py @@ -28,6 +28,12 @@ from IPython.utils.jsonutil import date_default, extract_dates, squash_dates # SQLite operators, adapters, and converters #----------------------------------------------------------------------------- +try: + buffer +except NameError: + # py3k + buffer = memoryview + operators = { '$lt' : "<", '$gt' : ">", @@ -54,7 +60,11 @@ def _convert_dict(ds): if ds is None: return ds else: - return extract_dates(json.loads(ds)) + if isinstance(ds, bytes): + # If I understand the sqlite doc correctly, this will always be utf8 + ds = ds.decode('utf8') + d = json.loads(ds) + return extract_dates(d) def _adapt_bufs(bufs): # this is *horrible* diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 80da049..43e911b 100755 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -28,7 +28,7 @@ from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode from IPython.parallel.controller.heartmonitor import Heart from IPython.parallel.factory import RegistrationFactory -from IPython.parallel.util import disambiguate_url +from IPython.parallel.util import disambiguate_url, ensure_bytes from IPython.zmq.session import Message @@ -65,7 +65,7 @@ class EngineFactory(RegistrationFactory): ctx = self.context reg = ctx.socket(zmq.XREQ) - reg.setsockopt(zmq.IDENTITY, self.ident) + reg.setsockopt(zmq.IDENTITY, ensure_bytes(self.ident)) reg.connect(self.url) self.registrar = zmqstream.ZMQStream(reg, self.loop) @@ -83,7 +83,7 @@ class EngineFactory(RegistrationFactory): self._abort_dc.stop() ctx = self.context loop = self.loop - identity = self.ident + identity = ensure_bytes(self.ident) idents,msg = self.session.feed_identities(msg) msg = Message(self.session.unpack_message(msg)) diff --git a/IPython/parallel/engine/streamkernel.py b/IPython/parallel/engine/streamkernel.py index 1df2d26..73bf8cc 100755 --- a/IPython/parallel/engine/streamkernel.py +++ b/IPython/parallel/engine/streamkernel.py @@ -35,12 +35,12 @@ import zmq from zmq.eventloop import ioloop, zmqstream # Local imports. -from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode +from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes from IPython.zmq.completer import KernelCompleter from IPython.parallel.error import wrap_exception from IPython.parallel.factory import SessionFactory -from IPython.parallel.util import serialize_object, unpack_apply_message +from IPython.parallel.util import serialize_object, unpack_apply_message, ensure_bytes def printer(*args): pprint(args, stream=sys.__stdout__) @@ -73,8 +73,14 @@ class Kernel(SessionFactory): # kwargs: exec_lines = List(Unicode, config=True, help="List of lines to execute") - + + # identities: int_id = Int(-1) + bident = CBytes() + ident = Unicode() + def _ident_changed(self, name, old, new): + self.bident = ensure_bytes(new) + user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""") control_stream = Instance(zmqstream.ZMQStream) @@ -193,6 +199,8 @@ class Kernel(SessionFactory): except: self.log.error("Invalid Message", exc_info=True) return + else: + self.log.debug("Control received, %s", msg) header = msg['header'] msg_id = header['msg_id'] @@ -247,7 +255,7 @@ class Kernel(SessionFactory): self.log.error("Got bad msg: %s"%parent, exc_info=True) return self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, - ident='%s.pyin'%self.prefix) + ident=ensure_bytes('%s.pyin'%self.prefix)) started = datetime.now() try: comp_code = self.compiler(code, '') @@ -261,7 +269,7 @@ class Kernel(SessionFactory): exc_content = self._wrap_exception('execute') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, - ident='%s.pyerr'%self.prefix) + ident=ensure_bytes('%s.pyerr'%self.prefix)) reply_content = exc_content else: reply_content = {'status' : 'ok'} @@ -285,7 +293,6 @@ class Kernel(SessionFactory): def apply_request(self, stream, ident, parent): # flush previous reply, so this request won't block it stream.flush(zmq.POLLOUT) - try: content = parent[u'content'] bufs = parent[u'buffers'] @@ -341,7 +348,7 @@ class Kernel(SessionFactory): exc_content = self._wrap_exception('apply') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, - ident='%s.pyerr'%self.prefix) + ident=ensure_bytes('%s.pyerr'%self.prefix)) reply_content = exc_content result_buf = [] @@ -370,6 +377,8 @@ class Kernel(SessionFactory): except: self.log.error("Invalid Message", exc_info=True) return + else: + self.log.debug("Message received, %s", msg) header = msg['header'] diff --git a/IPython/parallel/tests/clienttest.py b/IPython/parallel/tests/clienttest.py index 4ca0a24..6f22d88 100644 --- a/IPython/parallel/tests/clienttest.py +++ b/IPython/parallel/tests/clienttest.py @@ -41,9 +41,11 @@ def crash(): if sys.platform.startswith('win'): import ctypes ctypes.windll.kernel32.SetErrorMode(0x0002); - - co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00', - (), (), (), '', '', 1, b'') + args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b''] + if sys.version_info[0] >= 3: + args.insert(1, 0) + + co = types.CodeType(*args) exec(co) def wait(n): diff --git a/IPython/parallel/tests/test_asyncresult.py b/IPython/parallel/tests/test_asyncresult.py index 5d76107..f9448ad 100644 --- a/IPython/parallel/tests/test_asyncresult.py +++ b/IPython/parallel/tests/test_asyncresult.py @@ -57,7 +57,7 @@ class AsyncResultTest(ClusterTestCase): def test_get_after_error(self): ar = self.client[-1].apply_async(lambda : 1/0) - ar.wait() + ar.wait(10) self.assertRaisesRemote(ZeroDivisionError, ar.get) self.assertRaisesRemote(ZeroDivisionError, ar.get) self.assertRaisesRemote(ZeroDivisionError, ar.get_dict) diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 2153ba8..42c3b60 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -16,6 +16,8 @@ Authors: # Imports #------------------------------------------------------------------------------- +from __future__ import division + import time from datetime import datetime from tempfile import mktemp @@ -132,7 +134,9 @@ class TestClient(ClusterTestCase): self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) allqs = self.client.queue_status() self.assertTrue(isinstance(allqs, dict)) - self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned'])) + intkeys = list(allqs.keys()) + intkeys.remove('unassigned') + self.assertEquals(sorted(intkeys), sorted(self.client.ids)) unassigned = allqs.pop('unassigned') for eid,qs in allqs.items(): self.assertTrue(isinstance(qs, dict)) @@ -156,7 +160,7 @@ class TestClient(ClusterTestCase): def test_db_query_dt(self): """test db query by date""" hist = self.client.hub_history() - middle = self.client.db_query({'msg_id' : hist[len(hist)/2]})[0] + middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0] tic = middle['submitted'] before = self.client.db_query({'submitted' : {'$lt' : tic}}) after = self.client.db_query({'submitted' : {'$gte' : tic}}) diff --git a/IPython/parallel/tests/test_db.py b/IPython/parallel/tests/test_db.py index aee3b4a..fddb961 100644 --- a/IPython/parallel/tests/test_db.py +++ b/IPython/parallel/tests/test_db.py @@ -16,6 +16,7 @@ Authors: # Imports #------------------------------------------------------------------------------- +from __future__ import division import tempfile import time @@ -100,7 +101,7 @@ class TestDictBackend(TestCase): def test_find_records_dt(self): """test finding records by date""" hist = self.db.get_history() - middle = self.db.get_record(hist[len(hist)/2]) + middle = self.db.get_record(hist[len(hist)//2]) tic = middle['submitted'] before = self.db.find_records({'submitted' : {'$lt' : tic}}) after = self.db.find_records({'submitted' : {'$gte' : tic}}) @@ -168,7 +169,7 @@ class TestDictBackend(TestCase): query = {'msg_id' : {'$in':msg_ids}} self.db.drop_matching_records(query) recs = self.db.find_records(query) - self.assertTrue(len(recs)==0) + self.assertEquals(len(recs), 0) class TestSQLiteBackend(TestDictBackend): def create_db(self): diff --git a/IPython/parallel/tests/test_view.py b/IPython/parallel/tests/test_view.py index 0407488..b07ec31 100644 --- a/IPython/parallel/tests/test_view.py +++ b/IPython/parallel/tests/test_view.py @@ -43,7 +43,7 @@ class TestView(ClusterTestCase): # self.add_engines(1) eid = self.client.ids[-1] ar = self.client[eid].apply_async(crash) - self.assertRaisesRemote(error.EngineError, ar.get) + self.assertRaisesRemote(error.EngineError, ar.get, 10) eid = ar.engine_id tic = time.time() while eid in self.client.ids and time.time()-tic < 5: @@ -413,7 +413,10 @@ class TestView(ClusterTestCase): """test executing unicode strings""" v = self.client[-1] v.block=True - code=u"a=u'é'" + if sys.version_info[0] >= 3: + code="a='é'" + else: + code=u"a=u'é'" v.execute(code) self.assertEquals(v['a'], u'é') @@ -433,7 +436,7 @@ class TestView(ClusterTestCase): assert isinstance(check, bytes), "%r is not bytes"%check assert a.encode('utf8') == check, "%s != %s"%(a,check) - for s in [ u'é', u'ßø®∫','asdf'.decode() ]: + for s in [ u'é', u'ßø®∫',u'asdf' ]: try: v.apply_sync(check_unicode, s, s.encode('utf8')) except error.RemoteError as e: diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py index e366284..fe09943 100644 --- a/IPython/parallel/util.py +++ b/IPython/parallel/util.py @@ -101,6 +101,12 @@ class ReverseDict(dict): # Functions #----------------------------------------------------------------------------- +def ensure_bytes(s): + """ensure that an object is bytes""" + if isinstance(s, unicode): + s = s.encode(sys.getdefaultencoding(), 'replace') + return s + def validate_url(url): """validate a url for zeromq""" if not isinstance(url, basestring): diff --git a/IPython/utils/codeutil.py b/IPython/utils/codeutil.py index 31e0361..08f4155 100644 --- a/IPython/utils/codeutil.py +++ b/IPython/utils/codeutil.py @@ -23,6 +23,7 @@ __docformat__ = "restructuredtext en" # Imports #------------------------------------------------------------------------------- +import sys import new, types, copy_reg def code_ctor(*args): @@ -31,9 +32,12 @@ def code_ctor(*args): def reduce_code(co): if co.co_freevars or co.co_cellvars: raise ValueError("Sorry, cannot pickle code objects with closures") - return code_ctor, (co.co_argcount, co.co_nlocals, co.co_stacksize, - co.co_flags, co.co_code, co.co_consts, co.co_names, - co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, - co.co_lnotab) + args = [co.co_argcount, co.co_nlocals, co.co_stacksize, + co.co_flags, co.co_code, co.co_consts, co.co_names, + co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, + co.co_lnotab] + if sys.version_info[0] >= 3: + args.insert(1, co.co_kwonlyargcount) + return code_ctor, tuple(args) copy_reg.pickle(types.CodeType, reduce_code) \ No newline at end of file diff --git a/IPython/utils/newserialized.py b/IPython/utils/newserialized.py index 8fcab1f..d9259d2 100644 --- a/IPython/utils/newserialized.py +++ b/IPython/utils/newserialized.py @@ -19,16 +19,23 @@ __test__ = {} # Imports #------------------------------------------------------------------------------- +import sys import cPickle as pickle try: import numpy except ImportError: - pass + numpy = None class SerializationError(Exception): pass +if sys.version_info[0] >= 3: + buffer = memoryview + py3k = True +else: + py3k = False + #----------------------------------------------------------------------------- # Classes and functions #----------------------------------------------------------------------------- @@ -93,8 +100,11 @@ class SerializeIt(object): def __init__(self, unSerialized): self.data = None self.obj = unSerialized.getObject() - if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray): - if len(self.obj.shape) == 0: # length 0 arrays are just pickled + if numpy is not None and isinstance(self.obj, numpy.ndarray): + if py3k or len(self.obj.shape) == 0: # length 0 arrays are just pickled + # FIXME: + # also use pickle for numpy arrays on py3k, since + # pyzmq doesn't rebuild from memoryviews properly self.typeDescriptor = 'pickle' self.metadata = {} else: @@ -102,7 +112,7 @@ class SerializeIt(object): self.typeDescriptor = 'ndarray' self.metadata = {'shape':self.obj.shape, 'dtype':self.obj.dtype.str} - elif isinstance(self.obj, str): + elif isinstance(self.obj, bytes): self.typeDescriptor = 'bytes' self.metadata = {} elif isinstance(self.obj, buffer): @@ -146,9 +156,9 @@ class UnSerializeIt(UnSerialized): def getObject(self): typeDescriptor = self.serialized.getTypeDescriptor() - if globals().has_key('numpy') and typeDescriptor == 'ndarray': + if numpy is not None and typeDescriptor == 'ndarray': buf = self.serialized.getData() - if isinstance(buf, (str, buffer)): + if isinstance(buf, (bytes, buffer)): result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype']) else: # memoryview