From 9830094aecb7950b92ebcd673ce14f1d4e7a2e16 2011-07-03 21:14:10 From: MinRK Date: 2011-07-03 21:14:10 Subject: [PATCH] cleanup per review * remove extraneous 'd' variable in sqlitedb._convert_dict * don't use removed 'new' module in codeutil * add note about 3k-added kwonlyargcount in crash test * fix 'coupe' typo in client.client comment * rename ensure_bytes asbytes --- diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index d62ebd0..4678e8d 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -54,7 +54,7 @@ from IPython.parallel.controller.hub import HubFactory from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler from IPython.parallel.controller.sqlitedb import SQLiteDB -from IPython.parallel.util import signal_children, split_url, ensure_bytes +from IPython.parallel.util import signal_children, split_url, asbytes # conditional import of MongoDB backend class @@ -202,7 +202,7 @@ class IPControllerApp(BaseParallelApplication): # load from engine config with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f: cfg = json.loads(f.read()) - key = c.Session.key = ensure_bytes(cfg['exec_key']) + key = c.Session.key = asbytes(cfg['exec_key']) xport,addr = cfg['url'].split('://') c.HubFactory.engine_transport = xport ip,ports = addr.split(':') @@ -239,7 +239,7 @@ class IPControllerApp(BaseParallelApplication): # with open(keyfile, 'w') as f: # f.write(key) # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) - c.Session.key = ensure_bytes(key) + c.Session.key = asbytes(key) else: key = c.Session.key = b'' diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index b84be79..d4872c7 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -41,7 +41,7 @@ from IPython.config.configurable import Configurable from IPython.zmq.session import Session from IPython.parallel.engine.engine import EngineFactory from IPython.parallel.engine.streamkernel import Kernel -from IPython.parallel.util import disambiguate_url, ensure_bytes +from IPython.parallel.util import disambiguate_url, asbytes from IPython.utils.importstring import import_item from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float @@ -217,7 +217,7 @@ class IPEngineApp(BaseParallelApplication): with open(self.url_file) as f: d = json.loads(f.read()) if d['exec_key']: - config.Session.key = ensure_bytes(d['exec_key']) + config.Session.key = asbytes(d['exec_key']) d['url'] = disambiguate_url(d['url'], d['location']) config.EngineFactory.url = d['url'] config.EngineFactory.location = d['location'] diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 958d08e..c4ac985 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -50,7 +50,7 @@ from IPython.core.profiledir import ProfileDir, ProfileDirError from .view import DirectView, LoadBalancedView if sys.version_info[0] >= 3: - # xrange is used in a coupe 'isinstance' tests in py2 + # xrange is used in a couple 'isinstance' tests in py2 # should be just 'range' in 3k xrange = range @@ -362,12 +362,12 @@ class Client(HasTraits): if os.path.isfile(exec_key): extra_args['keyfile'] = exec_key else: - exec_key = util.ensure_bytes(exec_key) + exec_key = util.asbytes(exec_key) extra_args['key'] = exec_key self.session = Session(**extra_args) self._query_socket = self._context.socket(zmq.XREQ) - self._query_socket.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session)) + self._query_socket.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session)) if self._ssh: tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) else: @@ -460,7 +460,7 @@ class Client(HasTraits): if not isinstance(targets, (tuple, list, xrange)): raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) - return [util.ensure_bytes(self._engines[t]) for t in targets], list(targets) + return [util.asbytes(self._engines[t]) for t in targets], list(targets) def _connect(self, sshserver, ssh_kwargs, timeout): """setup all our socket connections to the cluster. This is called from @@ -493,7 +493,7 @@ class Client(HasTraits): content = msg.content self._config['registration'] = dict(content) if content.status == 'ok': - ident = util.ensure_bytes(self.session.session) + ident = util.asbytes(self.session.session) if content.mux: self._mux_socket = self._context.socket(zmq.XREQ) self._mux_socket.setsockopt(zmq.IDENTITY, ident) diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index 23a1a92..e51b6c5 100644 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -25,7 +25,7 @@ from zmq.eventloop import ioloop, zmqstream from IPython.config.configurable import LoggingConfigurable from IPython.utils.traitlets import Set, Instance, CFloat -from IPython.parallel.util import ensure_bytes +from IPython.parallel.util import asbytes class Heart(object): """A basic heart object for responding to a HeartMonitor. @@ -117,7 +117,7 @@ class HeartMonitor(LoggingConfigurable): self.responses = set() # print self.on_probation, self.hearts # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) - self.pingstream.send(ensure_bytes(str(self.lifetime))) + self.pingstream.send(asbytes(str(self.lifetime))) def handle_new_heart(self, heart): if self._new_handlers: @@ -142,8 +142,8 @@ class HeartMonitor(LoggingConfigurable): def handle_pong(self, msg): "a heart just beat" - current = ensure_bytes(str(self.lifetime)) - last = ensure_bytes(str(self.last_ping)) + current = asbytes(str(self.lifetime)) + last = asbytes(str(self.last_ping)) if msg[1] == current: delta = time.time()-self.tic # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 8549e89..5a66178 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -289,7 +289,7 @@ class HubFactory(RegistrationFactory): # resubmit stream r = ZMQStream(ctx.socket(zmq.XREQ), loop) url = util.disambiguate_url(self.client_info['task'][-1]) - r.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session)) + r.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session)) r.connect(url) self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, @@ -752,7 +752,7 @@ class Hub(SessionFactory): # print (content) msg_id = content['msg_id'] engine_uuid = content['engine_id'] - eid = self.by_ident[util.ensure_bytes(engine_uuid)] + eid = self.by_ident[util.asbytes(engine_uuid)] self.log.info("task::task %r arrived on %r"%(msg_id, eid)) if msg_id in self.unassigned: @@ -842,13 +842,13 @@ class Hub(SessionFactory): """Register a new engine.""" content = msg['content'] try: - queue = util.ensure_bytes(content['queue']) + queue = util.asbytes(content['queue']) except KeyError: self.log.error("registration::queue not specified", exc_info=True) return heart = content.get('heartbeat', None) if heart: - heart = util.ensure_bytes(heart) + heart = util.asbytes(heart) """register a new engine, and create the socket(s) necessary""" eid = self._next_id # print (eid, queue, reg, heart) @@ -915,7 +915,7 @@ class Hub(SessionFactory): self.log.info("registration::unregister_engine(%r)"%eid) # print (eid) uuid = self.keytable[eid] - content=dict(id=eid, queue=uuid.decode()) + content=dict(id=eid, queue=uuid.decode('ascii')) self.dead_engines.add(uuid) # self.ids.remove(eid) # uuid = self.keytable.pop(eid) @@ -983,7 +983,7 @@ class Hub(SessionFactory): self.tasks[eid] = list() self.completed[eid] = list() self.hearts[heart] = eid - content = dict(id=eid, queue=self.engines[eid].queue.decode()) + content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii')) if self.notifier: self.session.send(self.notifier, "registration_notification", content=content) self.log.info("engine::Engine Connected: %i"%eid) diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index ba86250..747d5b6 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -44,7 +44,7 @@ from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes from IPython.parallel import error from IPython.parallel.factory import SessionFactory -from IPython.parallel.util import connect_logger, local_logger, ensure_bytes +from IPython.parallel.util import connect_logger, local_logger, asbytes from .dependency import Dependency @@ -177,7 +177,7 @@ class TaskScheduler(SessionFactory): ident = CBytes() # ZMQ identity. This should just be self.session.session # but ensure Bytes def _ident_default(self): - return ensure_bytes(self.session.session) + return asbytes(self.session.session) def start(self): self.engine_stream.on_recv(self.dispatch_result, copy=False) @@ -223,7 +223,7 @@ class TaskScheduler(SessionFactory): self.log.error("Unhandled message type: %r"%msg_type) else: try: - handler(ensure_bytes(msg['content']['queue'])) + handler(asbytes(msg['content']['queue'])) except Exception: self.log.error("task::Invalid notification msg: %r",msg) @@ -323,7 +323,7 @@ class TaskScheduler(SessionFactory): # get targets as a set of bytes objects # from a list of unicode objects targets = header.get('targets', []) - targets = map(ensure_bytes, targets) + targets = map(asbytes, targets) targets = set(targets) retries = header.get('retries', 0) diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py index 167cfd7..f5cdafc 100644 --- a/IPython/parallel/controller/sqlitedb.py +++ b/IPython/parallel/controller/sqlitedb.py @@ -63,8 +63,7 @@ def _convert_dict(ds): if isinstance(ds, bytes): # If I understand the sqlite doc correctly, this will always be utf8 ds = ds.decode('utf8') - d = json.loads(ds) - return extract_dates(d) + return extract_dates(json.loads(ds)) def _adapt_bufs(bufs): # this is *horrible* diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index d4b30d4..dd91f8c 100755 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -28,7 +28,7 @@ from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, from IPython.parallel.controller.heartmonitor import Heart from IPython.parallel.factory import RegistrationFactory -from IPython.parallel.util import disambiguate_url, ensure_bytes +from IPython.parallel.util import disambiguate_url, asbytes from IPython.zmq.session import Message @@ -61,7 +61,7 @@ class EngineFactory(RegistrationFactory): bident = CBytes() ident = Unicode() def _ident_changed(self, name, old, new): - self.bident = ensure_bytes(new) + self.bident = asbytes(new) def __init__(self, **kwargs): diff --git a/IPython/parallel/engine/streamkernel.py b/IPython/parallel/engine/streamkernel.py index 73bf8cc..5e6203b 100755 --- a/IPython/parallel/engine/streamkernel.py +++ b/IPython/parallel/engine/streamkernel.py @@ -40,7 +40,7 @@ from IPython.zmq.completer import KernelCompleter from IPython.parallel.error import wrap_exception from IPython.parallel.factory import SessionFactory -from IPython.parallel.util import serialize_object, unpack_apply_message, ensure_bytes +from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes def printer(*args): pprint(args, stream=sys.__stdout__) @@ -79,7 +79,7 @@ class Kernel(SessionFactory): bident = CBytes() ident = Unicode() def _ident_changed(self, name, old, new): - self.bident = ensure_bytes(new) + self.bident = asbytes(new) user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""") @@ -255,7 +255,7 @@ class Kernel(SessionFactory): self.log.error("Got bad msg: %s"%parent, exc_info=True) return self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, - ident=ensure_bytes('%s.pyin'%self.prefix)) + ident=asbytes('%s.pyin'%self.prefix)) started = datetime.now() try: comp_code = self.compiler(code, '') @@ -269,7 +269,7 @@ class Kernel(SessionFactory): exc_content = self._wrap_exception('execute') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, - ident=ensure_bytes('%s.pyerr'%self.prefix)) + ident=asbytes('%s.pyerr'%self.prefix)) reply_content = exc_content else: reply_content = {'status' : 'ok'} @@ -348,7 +348,7 @@ class Kernel(SessionFactory): exc_content = self._wrap_exception('apply') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, - ident=ensure_bytes('%s.pyerr'%self.prefix)) + ident=asbytes('%s.pyerr'%self.prefix)) reply_content = exc_content result_buf = [] diff --git a/IPython/parallel/tests/clienttest.py b/IPython/parallel/tests/clienttest.py index 6f22d88..07635c2 100644 --- a/IPython/parallel/tests/clienttest.py +++ b/IPython/parallel/tests/clienttest.py @@ -43,6 +43,7 @@ def crash(): ctypes.windll.kernel32.SetErrorMode(0x0002); args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b''] if sys.version_info[0] >= 3: + # Python3 adds 'kwonlyargcount' as the second argument to Code args.insert(1, 0) co = types.CodeType(*args) diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py index 486b894..104cc7e 100644 --- a/IPython/parallel/util.py +++ b/IPython/parallel/util.py @@ -101,7 +101,7 @@ class ReverseDict(dict): # Functions #----------------------------------------------------------------------------- -def ensure_bytes(s): +def asbytes(s): """ensure that an object is ascii bytes""" if isinstance(s, unicode): s = s.encode('ascii') diff --git a/IPython/utils/codeutil.py b/IPython/utils/codeutil.py index 08f4155..ffefe52 100644 --- a/IPython/utils/codeutil.py +++ b/IPython/utils/codeutil.py @@ -24,10 +24,10 @@ __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- import sys -import new, types, copy_reg +import types, copy_reg def code_ctor(*args): - return new.code(*args) + return types.CodeType(*args) def reduce_code(co): if co.co_freevars or co.co_cellvars: