From a82262e59e85f16d1b03771d70769f4d550f76b3 2011-07-03 05:09:39
From: MinRK <benjaminrk@gmail.com>
Date: 2011-07-03 05:09:39
Subject: [PATCH] update parallel code for py3k

This is primarily tweaks of bytes/unicode, but other fixes include:

* some integer division
* added co_kwonlyargcount to code objects
* a few places to handle map/range being objects

non-copying numpy is disabled on py3k, because arrays are not reconstructed properly on the other side. This is because pyzmq
always receives a byte array (memoryview.itemsize=1), which confuses numpy.

With these changes, almost all parallel tests pass.  Notable exceptions are: sync_imports, and MUX engine death.

---

diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py
index 9ac1308..d62ebd0 100755
--- a/IPython/parallel/apps/ipcontrollerapp.py
+++ b/IPython/parallel/apps/ipcontrollerapp.py
@@ -54,7 +54,7 @@ from IPython.parallel.controller.hub import HubFactory
 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
 from IPython.parallel.controller.sqlitedb import SQLiteDB
 
-from IPython.parallel.util import signal_children, split_url
+from IPython.parallel.util import signal_children, split_url, ensure_bytes
 
 # conditional import of MongoDB backend class
 
@@ -202,14 +202,13 @@ class IPControllerApp(BaseParallelApplication):
         # load from engine config
         with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
             cfg = json.loads(f.read())
-        key = c.Session.key = cfg['exec_key']
+        key = c.Session.key = ensure_bytes(cfg['exec_key'])
         xport,addr = cfg['url'].split('://')
         c.HubFactory.engine_transport = xport
         ip,ports = addr.split(':')
         c.HubFactory.engine_ip = ip
         c.HubFactory.regport = int(ports)
         self.location = cfg['location']
-        
         # load client config
         with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
             cfg = json.loads(f.read())
@@ -240,9 +239,9 @@ class IPControllerApp(BaseParallelApplication):
             # with open(keyfile, 'w') as f:
             #     f.write(key)
             # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
-            c.Session.key = key
+            c.Session.key = ensure_bytes(key)
         else:
-            key = c.Session.key = ''
+            key = c.Session.key = b''
         
         try:
             self.factory = HubFactory(config=c, log=self.log)
@@ -273,27 +272,27 @@ class IPControllerApp(BaseParallelApplication):
         hub = self.factory
         # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
         # IOPub relay (in a Process)
-        q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
+        q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
         q.bind_in(hub.client_info['iopub'])
         q.bind_out(hub.engine_info['iopub'])
-        q.setsockopt_out(zmq.SUBSCRIBE, '')
+        q.setsockopt_out(zmq.SUBSCRIBE, b'')
         q.connect_mon(hub.monitor_url)
         q.daemon=True
         children.append(q)
 
         # Multiplexer Queue (in a Process)
-        q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
+        q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out')
         q.bind_in(hub.client_info['mux'])
-        q.setsockopt_in(zmq.IDENTITY, 'mux')
+        q.setsockopt_in(zmq.IDENTITY, b'mux')
         q.bind_out(hub.engine_info['mux'])
         q.connect_mon(hub.monitor_url)
         q.daemon=True
         children.append(q)
 
         # Control Queue (in a Process)
-        q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
+        q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol')
         q.bind_in(hub.client_info['control'])
-        q.setsockopt_in(zmq.IDENTITY, 'control')
+        q.setsockopt_in(zmq.IDENTITY, b'control')
         q.bind_out(hub.engine_info['control'])
         q.connect_mon(hub.monitor_url)
         q.daemon=True
@@ -305,10 +304,10 @@ class IPControllerApp(BaseParallelApplication):
         # Task Queue (in a Process)
         if scheme == 'pure':
             self.log.warn("task::using pure XREQ Task scheduler")
-            q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
+            q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask')
             # q.setsockopt_out(zmq.HWM, hub.hwm)
             q.bind_in(hub.client_info['task'][1])
-            q.setsockopt_in(zmq.IDENTITY, 'task')
+            q.setsockopt_in(zmq.IDENTITY, b'task')
             q.bind_out(hub.engine_info['task'])
             q.connect_mon(hub.monitor_url)
             q.daemon=True
diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py
index 9e6ba31..b84be79 100755
--- a/IPython/parallel/apps/ipengineapp.py
+++ b/IPython/parallel/apps/ipengineapp.py
@@ -41,7 +41,7 @@ from IPython.config.configurable import Configurable
 from IPython.zmq.session import Session
 from IPython.parallel.engine.engine import EngineFactory
 from IPython.parallel.engine.streamkernel import Kernel
-from IPython.parallel.util import disambiguate_url
+from IPython.parallel.util import disambiguate_url, ensure_bytes
 
 from IPython.utils.importstring import import_item
 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
@@ -216,11 +216,8 @@ class IPEngineApp(BaseParallelApplication):
             self.log.info("Loading url_file %r"%self.url_file)
             with open(self.url_file) as f:
                 d = json.loads(f.read())
-                for k,v in d.iteritems():
-                    if isinstance(v, unicode):
-                        d[k] = v.encode()
             if d['exec_key']:
-                config.Session.key = d['exec_key']
+                config.Session.key = ensure_bytes(d['exec_key'])
             d['url'] = disambiguate_url(d['url'], d['location'])
             config.EngineFactory.url = d['url']
             config.EngineFactory.location = d['location']
diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py
index 8a8c568..958d08e 100644
--- a/IPython/parallel/client/client.py
+++ b/IPython/parallel/client/client.py
@@ -17,6 +17,7 @@ Authors:
 
 import os
 import json
+import sys
 import time
 import warnings
 from datetime import datetime
@@ -48,6 +49,11 @@ from .asyncresult import AsyncResult, AsyncHubResult
 from IPython.core.profiledir import ProfileDir, ProfileDirError
 from .view import DirectView, LoadBalancedView
 
+if sys.version_info[0] >= 3:
+    # xrange is used in a coupe 'isinstance' tests in py2
+    # should be just 'range' in 3k
+    xrange = range
+
 #--------------------------------------------------------------------------
 # Decorators for Client methods
 #--------------------------------------------------------------------------
@@ -356,13 +362,12 @@ class Client(HasTraits):
             if os.path.isfile(exec_key):
                 extra_args['keyfile'] = exec_key
             else:
-                if isinstance(exec_key, unicode):
-                    exec_key = exec_key.encode('ascii')
+                exec_key = util.ensure_bytes(exec_key)
                 extra_args['key'] = exec_key
         self.session = Session(**extra_args)
         
         self._query_socket = self._context.socket(zmq.XREQ)
-        self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
+        self._query_socket.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session))
         if self._ssh:
             tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
         else:
@@ -404,7 +409,7 @@ class Client(HasTraits):
         """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
         for k,v in engines.iteritems():
             eid = int(k)
-            self._engines[eid] = bytes(v) # force not unicode
+            self._engines[eid] = v
             self._ids.append(eid)
         self._ids = sorted(self._ids)
         if sorted(self._engines.keys()) != range(len(self._engines)) and \
@@ -455,7 +460,7 @@ class Client(HasTraits):
         if not isinstance(targets, (tuple, list, xrange)):
             raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
             
-        return [self._engines[t] for t in targets], list(targets)
+        return [util.ensure_bytes(self._engines[t]) for t in targets], list(targets)
     
     def _connect(self, sshserver, ssh_kwargs, timeout):
         """setup all our socket connections to the cluster. This is called from
@@ -488,14 +493,15 @@ class Client(HasTraits):
         content = msg.content
         self._config['registration'] = dict(content)
         if content.status == 'ok':
+            ident = util.ensure_bytes(self.session.session)
             if content.mux:
                 self._mux_socket = self._context.socket(zmq.XREQ)
-                self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
+                self._mux_socket.setsockopt(zmq.IDENTITY, ident)
                 connect_socket(self._mux_socket, content.mux)
             if content.task:
                 self._task_scheme, task_addr = content.task
                 self._task_socket = self._context.socket(zmq.XREQ)
-                self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
+                self._task_socket.setsockopt(zmq.IDENTITY, ident)
                 connect_socket(self._task_socket, task_addr)
             if content.notification:
                 self._notification_socket = self._context.socket(zmq.SUB)
@@ -507,12 +513,12 @@ class Client(HasTraits):
             #     connect_socket(self._query_socket, content.query)
             if content.control:
                 self._control_socket = self._context.socket(zmq.XREQ)
-                self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
+                self._control_socket.setsockopt(zmq.IDENTITY, ident)
                 connect_socket(self._control_socket, content.control)
             if content.iopub:
                 self._iopub_socket = self._context.socket(zmq.SUB)
                 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
-                self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
+                self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
                 connect_socket(self._iopub_socket, content.iopub)
             self._update_engines(dict(content.engines))
         else:
diff --git a/IPython/parallel/client/map.py b/IPython/parallel/client/map.py
index 5ea8d96..1829d8c 100644
--- a/IPython/parallel/client/map.py
+++ b/IPython/parallel/client/map.py
@@ -13,8 +13,6 @@ Authors:
 
 """
 
-__docformat__ = "restructuredtext en"
-
 #-------------------------------------------------------------------------------
 #  Copyright (C) 2008-2011  The IPython Development Team
 #
@@ -26,6 +24,8 @@ __docformat__ = "restructuredtext en"
 # Imports
 #-------------------------------------------------------------------------------
 
+from __future__ import division
+
 import types
 
 from IPython.utils.data import flatten as utils_flatten
@@ -67,7 +67,7 @@ class Map:
           return
           
         remainder = len(seq)%q
-        basesize = len(seq)/q
+        basesize = len(seq)//q
         hi = []
         lo = []
         for n in range(q):
diff --git a/IPython/parallel/client/remotefunction.py b/IPython/parallel/client/remotefunction.py
index cdaccdf..f3897bf 100644
--- a/IPython/parallel/client/remotefunction.py
+++ b/IPython/parallel/client/remotefunction.py
@@ -16,6 +16,8 @@ Authors:
 # Imports
 #-----------------------------------------------------------------------------
 
+from __future__ import division
+
 import warnings
 
 from IPython.testing.skipdoctest import skip_doctest
@@ -142,7 +144,7 @@ class ParallelFunction(RemoteFunction):
         balanced = 'Balanced' in self.view.__class__.__name__
         if balanced:
             if self.chunksize:
-                nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0)
+                nparts = len_0//self.chunksize + int(len_0%self.chunksize > 0)
             else:
                 nparts = len_0
             targets = [None]*nparts
@@ -169,7 +171,10 @@ class ParallelFunction(RemoteFunction):
             
             # print (args)
             if hasattr(self, '_map'):
-                f = map
+                if sys.version_info[0] >= 3:
+                    f = lambda f, *sequences: list(map(f, *sequences))
+                else:
+                    f = map
                 args = [self.func]+args
             else:
                 f=self.func
diff --git a/IPython/parallel/client/view.py b/IPython/parallel/client/view.py
index be35241..c604bbb 100644
--- a/IPython/parallel/client/view.py
+++ b/IPython/parallel/client/view.py
@@ -969,6 +969,8 @@ class LoadBalancedView(View):
             idents = []
         else:
             idents = self.client._build_targets(targets)[0]
+            # ensure *not* bytes
+            idents = [ ident.decode() for ident in idents ]
         
         after = self._render_dependency(after)
         follow = self._render_dependency(follow)
diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py
index 5e231a3..47658c8 100644
--- a/IPython/parallel/controller/heartmonitor.py
+++ b/IPython/parallel/controller/heartmonitor.py
@@ -25,6 +25,8 @@ from zmq.eventloop import ioloop, zmqstream
 from IPython.config.configurable import LoggingConfigurable
 from IPython.utils.traitlets import Set, Instance, CFloat
 
+from IPython.parallel.util import ensure_bytes
+
 class Heart(object):
     """A basic heart object for responding to a HeartMonitor.
     This is a simple wrapper with defaults for the most common
@@ -42,9 +44,9 @@ class Heart(object):
         self.device.connect_in(in_addr)
         self.device.connect_out(out_addr)
         if in_type == zmq.SUB:
-            self.device.setsockopt_in(zmq.SUBSCRIBE, "")
+            self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
         if heart_id is None:
-            heart_id = str(uuid.uuid4())
+            heart_id = ensure_bytes(uuid.uuid4())
         self.device.setsockopt_out(zmq.IDENTITY, heart_id)
         self.id = heart_id
     
@@ -115,7 +117,7 @@ class HeartMonitor(LoggingConfigurable):
         self.responses = set()
         # print self.on_probation, self.hearts
         # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
-        self.pingstream.send(str(self.lifetime))
+        self.pingstream.send(ensure_bytes(str(self.lifetime)))
     
     def handle_new_heart(self, heart):
         if self._new_handlers:
@@ -140,11 +142,13 @@ class HeartMonitor(LoggingConfigurable):
     
     def handle_pong(self, msg):
         "a heart just beat"
-        if msg[1] == str(self.lifetime):
+        current = ensure_bytes(str(self.lifetime))
+        last = ensure_bytes(str(self.last_ping))
+        if msg[1] == current:
             delta = time.time()-self.tic
             # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
             self.responses.add(msg[0])
-        elif msg[1] == str(self.last_ping):
+        elif msg[1] == last:
             delta = time.time()-self.tic + (self.lifetime-self.last_ping)
             self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
             self.responses.add(msg[0])
diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py
index 7ee5977..f601ce5 100755
--- a/IPython/parallel/controller/hub.py
+++ b/IPython/parallel/controller/hub.py
@@ -289,7 +289,7 @@ class HubFactory(RegistrationFactory):
         # resubmit stream
         r = ZMQStream(ctx.socket(zmq.XREQ), loop)
         url = util.disambiguate_url(self.client_info['task'][-1])
-        r.setsockopt(zmq.IDENTITY, self.session.session)
+        r.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session))
         r.connect(url)
 
         self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -380,14 +380,14 @@ class Hub(SessionFactory):
         self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
         self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
         
-        self.monitor_handlers = { 'in' : self.save_queue_request,
-                                'out': self.save_queue_result,
-                                'intask': self.save_task_request,
-                                'outtask': self.save_task_result,
-                                'tracktask': self.save_task_destination,
-                                'incontrol': _passer,
-                                'outcontrol': _passer,
-                                'iopub': self.save_iopub_message,
+        self.monitor_handlers = {b'in' : self.save_queue_request,
+                                b'out': self.save_queue_result,
+                                b'intask': self.save_task_request,
+                                b'outtask': self.save_task_result,
+                                b'tracktask': self.save_task_destination,
+                                b'incontrol': _passer,
+                                b'outcontrol': _passer,
+                                b'iopub': self.save_iopub_message,
         }
         
         self.query_handlers = {'queue_request': self.queue_status,
@@ -562,8 +562,9 @@ class Hub(SessionFactory):
             return
         record = init_record(msg)
         msg_id = record['msg_id']
-        record['engine_uuid'] = queue_id
-        record['client_uuid'] = client_id
+        # Unicode in records
+        record['engine_uuid'] = queue_id.decode('utf8', 'replace')
+        record['client_uuid'] = client_id.decode('utf8', 'replace')
         record['queue'] = 'mux'
 
         try:
@@ -751,7 +752,7 @@ class Hub(SessionFactory):
         # print (content)
         msg_id = content['msg_id']
         engine_uuid = content['engine_id']
-        eid = self.by_ident[engine_uuid]
+        eid = self.by_ident[util.ensure_bytes(engine_uuid)]
         
         self.log.info("task::task %r arrived on %r"%(msg_id, eid))
         if msg_id in self.unassigned:
@@ -833,7 +834,7 @@ class Hub(SessionFactory):
         jsonable = {}
         for k,v in self.keytable.iteritems():
             if v not in self.dead_engines:
-                jsonable[str(k)] = v
+                jsonable[str(k)] = v.decode()
         content['engines'] = jsonable
         self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
     
@@ -841,11 +842,13 @@ class Hub(SessionFactory):
         """Register a new engine."""
         content = msg['content']
         try:
-            queue = content['queue']
+            queue = util.ensure_bytes(content['queue'])
         except KeyError:
             self.log.error("registration::queue not specified", exc_info=True)
             return
         heart = content.get('heartbeat', None)
+        if heart:
+            heart = util.ensure_bytes(heart)
         """register a new engine, and create the socket(s) necessary"""
         eid = self._next_id
         # print (eid, queue, reg, heart)
@@ -912,7 +915,7 @@ class Hub(SessionFactory):
         self.log.info("registration::unregister_engine(%r)"%eid)
         # print (eid)
         uuid = self.keytable[eid]
-        content=dict(id=eid, queue=uuid)
+        content=dict(id=eid, queue=uuid.decode())
         self.dead_engines.add(uuid)
         # self.ids.remove(eid)
         # uuid = self.keytable.pop(eid)
@@ -980,7 +983,7 @@ class Hub(SessionFactory):
         self.tasks[eid] = list()
         self.completed[eid] = list()
         self.hearts[heart] = eid
-        content = dict(id=eid, queue=self.engines[eid].queue)
+        content = dict(id=eid, queue=self.engines[eid].queue.decode())
         if self.notifier:
             self.session.send(self.notifier, "registration_notification", content=content)
         self.log.info("engine::Engine Connected: %i"%eid)
@@ -1054,9 +1057,9 @@ class Hub(SessionFactory):
                 queue = len(queue)
                 completed = len(completed)
                 tasks = len(tasks)
-            content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
+            content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
         content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
-        
+        # print (content)
         self.session.send(self.query, "queue_reply", content=content, ident=client_id)
     
     def purge_results(self, client_id, msg):
@@ -1179,7 +1182,7 @@ class Hub(SessionFactory):
                             'io' : io_dict,
                           }
         if rec['result_buffers']:
-            buffers = map(str, rec['result_buffers'])
+            buffers = map(bytes, rec['result_buffers'])
         else:
             buffers = []
         
@@ -1281,7 +1284,7 @@ class Hub(SessionFactory):
                     buffers.extend(rb)
             content = dict(status='ok', records=records, buffer_lens=buffer_lens,
                                     result_buffer_lens=result_buffer_lens)
-        
+        # self.log.debug (content)
         self.session.send(self.query, "db_reply", content=content, 
                                             parent=msg, ident=client_id,
                                             buffers=buffers)
diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py
index af76c86..55ee059 100644
--- a/IPython/parallel/controller/scheduler.py
+++ b/IPython/parallel/controller/scheduler.py
@@ -40,11 +40,11 @@ from zmq.eventloop import ioloop, zmqstream
 from IPython.external.decorator import decorator
 from IPython.config.application import Application
 from IPython.config.loader import Config
-from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
+from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes
 
 from IPython.parallel import error
 from IPython.parallel.factory import SessionFactory
-from IPython.parallel.util import connect_logger, local_logger
+from IPython.parallel.util import connect_logger, local_logger, ensure_bytes
 
 from .dependency import Dependency
 
@@ -174,6 +174,10 @@ class TaskScheduler(SessionFactory):
     blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
     auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
     
+    ident = CBytes() # ZMQ identity. This should just be self.session.session
+                     # but ensure Bytes
+    def _ident_default(self):
+        return ensure_bytes(self.session.session)
     
     def start(self):
         self.engine_stream.on_recv(self.dispatch_result, copy=False)
@@ -204,7 +208,7 @@ class TaskScheduler(SessionFactory):
         try:
             idents,msg = self.session.feed_identities(msg)
         except ValueError:
-            self.log.warn("task::Invalid Message: %r"%msg)
+            self.log.warn("task::Invalid Message: %r",msg)
             return
         try:
             msg = self.session.unpack_message(msg)
@@ -219,15 +223,16 @@ class TaskScheduler(SessionFactory):
             self.log.error("Unhandled message type: %r"%msg_type)
         else:
             try:
-                handler(str(msg['content']['queue']))
-            except KeyError:
-                self.log.error("task::Invalid notification msg: %r"%msg)
+                handler(ensure_bytes(msg['content']['queue']))
+            except Exception:
+                self.log.error("task::Invalid notification msg: %r",msg)
     
     def _register_engine(self, uid):
         """New engine with ident `uid` became available."""
         # head of the line:
         self.targets.insert(0,uid)
         self.loads.insert(0,0)
+
         # initialize sets
         self.completed[uid] = set()
         self.failed[uid] = set()
@@ -309,14 +314,18 @@ class TaskScheduler(SessionFactory):
         
         
         # send to monitor
-        self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
+        self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
         
         header = msg['header']
         msg_id = header['msg_id']
         self.all_ids.add(msg_id)
         
-        # targets
-        targets = set(header.get('targets', []))
+        # get targets as a set of bytes objects
+        # from a list of unicode objects
+        targets = header.get('targets', [])
+        targets = map(ensure_bytes, targets)
+        targets = set(targets)
+            
         retries = header.get('retries', 0)
         self.retries[msg_id] = retries
         
@@ -412,7 +421,7 @@ class TaskScheduler(SessionFactory):
         
         msg = self.session.send(self.client_stream, 'apply_reply', content, 
                                                 parent=header, ident=idents)
-        self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
+        self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
         
         self.update_graph(msg_id, success=False)
     
@@ -494,9 +503,9 @@ class TaskScheduler(SessionFactory):
         self.add_job(idx)
         self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
         # notify Hub
-        content = dict(msg_id=msg_id, engine_id=target)
+        content = dict(msg_id=msg_id, engine_id=target.decode())
         self.session.send(self.mon_stream, 'task_destination', content=content, 
-                        ident=['tracktask',self.session.session])
+                        ident=[b'tracktask',self.ident])
         
     
     #-----------------------------------------------------------------------
@@ -533,7 +542,7 @@ class TaskScheduler(SessionFactory):
                 # relay to client and update graph
                 self.handle_result(idents, parent, raw_msg, success)
                 # send to Hub monitor
-                self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
+                self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
         else:
             self.handle_unmet_dependency(idents, parent)
         
diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py
index d7390a6..167cfd7 100644
--- a/IPython/parallel/controller/sqlitedb.py
+++ b/IPython/parallel/controller/sqlitedb.py
@@ -28,6 +28,12 @@ from IPython.utils.jsonutil import date_default, extract_dates, squash_dates
 # SQLite operators, adapters, and converters
 #-----------------------------------------------------------------------------
 
+try:
+    buffer
+except NameError:
+    # py3k
+    buffer = memoryview
+
 operators = {
  '$lt' : "<",
  '$gt' : ">",
@@ -54,7 +60,11 @@ def _convert_dict(ds):
     if ds is None:
         return ds
     else:
-        return extract_dates(json.loads(ds))
+        if isinstance(ds, bytes):
+            # If I understand the sqlite doc correctly, this will always be utf8
+            ds = ds.decode('utf8')
+        d = json.loads(ds)
+        return extract_dates(d)
 
 def _adapt_bufs(bufs):
     # this is *horrible*
diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py
index 80da049..43e911b 100755
--- a/IPython/parallel/engine/engine.py
+++ b/IPython/parallel/engine/engine.py
@@ -28,7 +28,7 @@ from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode
 
 from IPython.parallel.controller.heartmonitor import Heart
 from IPython.parallel.factory import RegistrationFactory
-from IPython.parallel.util import disambiguate_url
+from IPython.parallel.util import disambiguate_url, ensure_bytes
 
 from IPython.zmq.session import Message
 
@@ -65,7 +65,7 @@ class EngineFactory(RegistrationFactory):
         ctx = self.context
         
         reg = ctx.socket(zmq.XREQ)
-        reg.setsockopt(zmq.IDENTITY, self.ident)
+        reg.setsockopt(zmq.IDENTITY, ensure_bytes(self.ident))
         reg.connect(self.url)
         self.registrar = zmqstream.ZMQStream(reg, self.loop)
         
@@ -83,7 +83,7 @@ class EngineFactory(RegistrationFactory):
         self._abort_dc.stop()
         ctx = self.context
         loop = self.loop
-        identity = self.ident
+        identity = ensure_bytes(self.ident)
         
         idents,msg = self.session.feed_identities(msg)
         msg = Message(self.session.unpack_message(msg))
diff --git a/IPython/parallel/engine/streamkernel.py b/IPython/parallel/engine/streamkernel.py
index 1df2d26..73bf8cc 100755
--- a/IPython/parallel/engine/streamkernel.py
+++ b/IPython/parallel/engine/streamkernel.py
@@ -35,12 +35,12 @@ import zmq
 from zmq.eventloop import ioloop, zmqstream
 
 # Local imports.
-from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
+from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes
 from IPython.zmq.completer import KernelCompleter
 
 from IPython.parallel.error import wrap_exception
 from IPython.parallel.factory import SessionFactory
-from IPython.parallel.util import serialize_object, unpack_apply_message
+from IPython.parallel.util import serialize_object, unpack_apply_message, ensure_bytes
 
 def printer(*args):
     pprint(args, stream=sys.__stdout__)
@@ -73,8 +73,14 @@ class Kernel(SessionFactory):
     # kwargs:
     exec_lines = List(Unicode, config=True,
         help="List of lines to execute")
-
+    
+    # identities:
     int_id = Int(-1)
+    bident = CBytes()
+    ident = Unicode()
+    def _ident_changed(self, name, old, new):
+        self.bident = ensure_bytes(new)
+    
     user_ns = Dict(config=True,  help="""Set the user's namespace of the Kernel""")
     
     control_stream = Instance(zmqstream.ZMQStream)
@@ -193,6 +199,8 @@ class Kernel(SessionFactory):
         except:
             self.log.error("Invalid Message", exc_info=True)
             return
+        else:
+            self.log.debug("Control received, %s", msg)
         
         header = msg['header']
         msg_id = header['msg_id']
@@ -247,7 +255,7 @@ class Kernel(SessionFactory):
             self.log.error("Got bad msg: %s"%parent, exc_info=True)
             return
         self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
-                            ident='%s.pyin'%self.prefix)
+                            ident=ensure_bytes('%s.pyin'%self.prefix))
         started = datetime.now()
         try:
             comp_code = self.compiler(code, '<zmq-kernel>')
@@ -261,7 +269,7 @@ class Kernel(SessionFactory):
             exc_content = self._wrap_exception('execute')
             # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
             self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
-                            ident='%s.pyerr'%self.prefix)
+                            ident=ensure_bytes('%s.pyerr'%self.prefix))
             reply_content = exc_content
         else:
             reply_content = {'status' : 'ok'}
@@ -285,7 +293,6 @@ class Kernel(SessionFactory):
     def apply_request(self, stream, ident, parent):
         # flush previous reply, so this request won't block it
         stream.flush(zmq.POLLOUT)
-        
         try:
             content = parent[u'content']
             bufs = parent[u'buffers']
@@ -341,7 +348,7 @@ class Kernel(SessionFactory):
             exc_content = self._wrap_exception('apply')
             # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
             self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
-                                ident='%s.pyerr'%self.prefix)
+                                ident=ensure_bytes('%s.pyerr'%self.prefix))
             reply_content = exc_content
             result_buf = []
             
@@ -370,6 +377,8 @@ class Kernel(SessionFactory):
         except:
             self.log.error("Invalid Message", exc_info=True)
             return
+        else:
+            self.log.debug("Message received, %s", msg)
             
         
         header = msg['header']
diff --git a/IPython/parallel/tests/clienttest.py b/IPython/parallel/tests/clienttest.py
index 4ca0a24..6f22d88 100644
--- a/IPython/parallel/tests/clienttest.py
+++ b/IPython/parallel/tests/clienttest.py
@@ -41,9 +41,11 @@ def crash():
     if sys.platform.startswith('win'):
         import ctypes
         ctypes.windll.kernel32.SetErrorMode(0x0002);
-
-    co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00',
-                             (), (), (), '', '', 1, b'')
+    args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b'']
+    if sys.version_info[0] >= 3:
+        args.insert(1, 0)
+        
+    co = types.CodeType(*args)
     exec(co)
 
 def wait(n):
diff --git a/IPython/parallel/tests/test_asyncresult.py b/IPython/parallel/tests/test_asyncresult.py
index 5d76107..f9448ad 100644
--- a/IPython/parallel/tests/test_asyncresult.py
+++ b/IPython/parallel/tests/test_asyncresult.py
@@ -57,7 +57,7 @@ class AsyncResultTest(ClusterTestCase):
 
     def test_get_after_error(self):
         ar = self.client[-1].apply_async(lambda : 1/0)
-        ar.wait()
+        ar.wait(10)
         self.assertRaisesRemote(ZeroDivisionError, ar.get)
         self.assertRaisesRemote(ZeroDivisionError, ar.get)
         self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py
index 2153ba8..42c3b60 100644
--- a/IPython/parallel/tests/test_client.py
+++ b/IPython/parallel/tests/test_client.py
@@ -16,6 +16,8 @@ Authors:
 # Imports
 #-------------------------------------------------------------------------------
 
+from __future__ import division
+
 import time
 from datetime import datetime
 from tempfile import mktemp
@@ -132,7 +134,9 @@ class TestClient(ClusterTestCase):
         self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
         allqs = self.client.queue_status()
         self.assertTrue(isinstance(allqs, dict))
-        self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned']))
+        intkeys = list(allqs.keys())
+        intkeys.remove('unassigned')
+        self.assertEquals(sorted(intkeys), sorted(self.client.ids))
         unassigned = allqs.pop('unassigned')
         for eid,qs in allqs.items():
             self.assertTrue(isinstance(qs, dict))
@@ -156,7 +160,7 @@ class TestClient(ClusterTestCase):
     def test_db_query_dt(self):
         """test db query by date"""
         hist = self.client.hub_history()
-        middle = self.client.db_query({'msg_id' : hist[len(hist)/2]})[0]
+        middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
         tic = middle['submitted']
         before = self.client.db_query({'submitted' : {'$lt' : tic}})
         after = self.client.db_query({'submitted' : {'$gte' : tic}})
diff --git a/IPython/parallel/tests/test_db.py b/IPython/parallel/tests/test_db.py
index aee3b4a..fddb961 100644
--- a/IPython/parallel/tests/test_db.py
+++ b/IPython/parallel/tests/test_db.py
@@ -16,6 +16,7 @@ Authors:
 # Imports
 #-------------------------------------------------------------------------------
 
+from __future__ import division
 
 import tempfile
 import time
@@ -100,7 +101,7 @@ class TestDictBackend(TestCase):
     def test_find_records_dt(self):
         """test finding records by date"""
         hist = self.db.get_history()
-        middle = self.db.get_record(hist[len(hist)/2])
+        middle = self.db.get_record(hist[len(hist)//2])
         tic = middle['submitted']
         before = self.db.find_records({'submitted' : {'$lt' : tic}})
         after = self.db.find_records({'submitted' : {'$gte' : tic}})
@@ -168,7 +169,7 @@ class TestDictBackend(TestCase):
         query = {'msg_id' : {'$in':msg_ids}}
         self.db.drop_matching_records(query)
         recs = self.db.find_records(query)
-        self.assertTrue(len(recs)==0)
+        self.assertEquals(len(recs), 0)
             
 class TestSQLiteBackend(TestDictBackend):
     def create_db(self):
diff --git a/IPython/parallel/tests/test_view.py b/IPython/parallel/tests/test_view.py
index 0407488..b07ec31 100644
--- a/IPython/parallel/tests/test_view.py
+++ b/IPython/parallel/tests/test_view.py
@@ -43,7 +43,7 @@ class TestView(ClusterTestCase):
         # self.add_engines(1)
         eid = self.client.ids[-1]
         ar = self.client[eid].apply_async(crash)
-        self.assertRaisesRemote(error.EngineError, ar.get)
+        self.assertRaisesRemote(error.EngineError, ar.get, 10)
         eid = ar.engine_id
         tic = time.time()
         while eid in self.client.ids and time.time()-tic < 5:
@@ -413,7 +413,10 @@ class TestView(ClusterTestCase):
         """test executing unicode strings"""
         v = self.client[-1]
         v.block=True
-        code=u"a=u'é'"
+        if sys.version_info[0] >= 3:
+            code="a='é'"
+        else:
+            code=u"a=u'é'"
         v.execute(code)
         self.assertEquals(v['a'], u'é')
         
@@ -433,7 +436,7 @@ class TestView(ClusterTestCase):
             assert isinstance(check, bytes), "%r is not bytes"%check
             assert a.encode('utf8') == check, "%s != %s"%(a,check)
         
-        for s in [ u'é', u'ßø®∫','asdf'.decode() ]:
+        for s in [ u'é', u'ßø®∫',u'asdf' ]:
             try:
                 v.apply_sync(check_unicode, s, s.encode('utf8'))
             except error.RemoteError as e:
diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py
index e366284..fe09943 100644
--- a/IPython/parallel/util.py
+++ b/IPython/parallel/util.py
@@ -101,6 +101,12 @@ class ReverseDict(dict):
 # Functions
 #-----------------------------------------------------------------------------
 
+def ensure_bytes(s):
+    """ensure that an object is bytes"""
+    if isinstance(s, unicode):
+        s = s.encode(sys.getdefaultencoding(), 'replace')
+    return s
+
 def validate_url(url):
     """validate a url for zeromq"""
     if not isinstance(url, basestring):
diff --git a/IPython/utils/codeutil.py b/IPython/utils/codeutil.py
index 31e0361..08f4155 100644
--- a/IPython/utils/codeutil.py
+++ b/IPython/utils/codeutil.py
@@ -23,6 +23,7 @@ __docformat__ = "restructuredtext en"
 # Imports
 #-------------------------------------------------------------------------------
 
+import sys
 import new, types, copy_reg
 
 def code_ctor(*args):
@@ -31,9 +32,12 @@ def code_ctor(*args):
 def reduce_code(co):
     if co.co_freevars or co.co_cellvars:
         raise ValueError("Sorry, cannot pickle code objects with closures")
-    return code_ctor, (co.co_argcount, co.co_nlocals, co.co_stacksize,
-        co.co_flags, co.co_code, co.co_consts, co.co_names,
-        co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno,
-        co.co_lnotab)
+    args =  [co.co_argcount, co.co_nlocals, co.co_stacksize,
+            co.co_flags, co.co_code, co.co_consts, co.co_names,
+            co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno,
+            co.co_lnotab]
+    if sys.version_info[0] >= 3:
+        args.insert(1, co.co_kwonlyargcount)
+    return code_ctor, tuple(args)
 
 copy_reg.pickle(types.CodeType, reduce_code)
\ No newline at end of file
diff --git a/IPython/utils/newserialized.py b/IPython/utils/newserialized.py
index 8fcab1f..d9259d2 100644
--- a/IPython/utils/newserialized.py
+++ b/IPython/utils/newserialized.py
@@ -19,16 +19,23 @@ __test__ = {}
 # Imports
 #-------------------------------------------------------------------------------
 
+import sys
 import cPickle as pickle
 
 try:
     import numpy
 except ImportError:
-    pass
+    numpy = None
 
 class SerializationError(Exception):
     pass
 
+if sys.version_info[0] >= 3:
+    buffer = memoryview
+    py3k = True
+else:
+    py3k = False
+
 #-----------------------------------------------------------------------------
 # Classes and functions
 #-----------------------------------------------------------------------------
@@ -93,8 +100,11 @@ class SerializeIt(object):
     def __init__(self, unSerialized):
         self.data = None
         self.obj = unSerialized.getObject()
-        if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray):
-            if len(self.obj.shape) == 0: # length 0 arrays are just pickled
+        if numpy is not None and isinstance(self.obj, numpy.ndarray):
+            if py3k or len(self.obj.shape) == 0: # length 0 arrays are just pickled
+                # FIXME:
+                # also use pickle for numpy arrays on py3k, since
+                # pyzmq doesn't rebuild from memoryviews properly
                 self.typeDescriptor = 'pickle'
                 self.metadata = {}
             else:
@@ -102,7 +112,7 @@ class SerializeIt(object):
                 self.typeDescriptor = 'ndarray'
                 self.metadata = {'shape':self.obj.shape,
                                  'dtype':self.obj.dtype.str}
-        elif isinstance(self.obj, str):
+        elif isinstance(self.obj, bytes):
             self.typeDescriptor = 'bytes'
             self.metadata = {}
         elif isinstance(self.obj, buffer):
@@ -146,9 +156,9 @@ class UnSerializeIt(UnSerialized):
         
     def getObject(self):
         typeDescriptor = self.serialized.getTypeDescriptor()
-        if globals().has_key('numpy') and typeDescriptor == 'ndarray':
+        if numpy is not None and typeDescriptor == 'ndarray':
                 buf = self.serialized.getData()
-                if isinstance(buf, (str, buffer)):
+                if isinstance(buf, (bytes, buffer)):
                     result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
                 else:
                     # memoryview