##// END OF EJS Templates
cleanup per review...
MinRK -
Show More
@@ -54,7 +54,7 b' from IPython.parallel.controller.hub import HubFactory'
54 54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56 56
57 from IPython.parallel.util import signal_children, split_url, ensure_bytes
57 from IPython.parallel.util import signal_children, split_url, asbytes
58 58
59 59 # conditional import of MongoDB backend class
60 60
@@ -202,7 +202,7 b' class IPControllerApp(BaseParallelApplication):'
202 202 # load from engine config
203 203 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
204 204 cfg = json.loads(f.read())
205 key = c.Session.key = ensure_bytes(cfg['exec_key'])
205 key = c.Session.key = asbytes(cfg['exec_key'])
206 206 xport,addr = cfg['url'].split('://')
207 207 c.HubFactory.engine_transport = xport
208 208 ip,ports = addr.split(':')
@@ -239,7 +239,7 b' class IPControllerApp(BaseParallelApplication):'
239 239 # with open(keyfile, 'w') as f:
240 240 # f.write(key)
241 241 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
242 c.Session.key = ensure_bytes(key)
242 c.Session.key = asbytes(key)
243 243 else:
244 244 key = c.Session.key = b''
245 245
@@ -41,7 +41,7 b' from IPython.config.configurable import Configurable'
41 41 from IPython.zmq.session import Session
42 42 from IPython.parallel.engine.engine import EngineFactory
43 43 from IPython.parallel.engine.streamkernel import Kernel
44 from IPython.parallel.util import disambiguate_url, ensure_bytes
44 from IPython.parallel.util import disambiguate_url, asbytes
45 45
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
@@ -217,7 +217,7 b' class IPEngineApp(BaseParallelApplication):'
217 217 with open(self.url_file) as f:
218 218 d = json.loads(f.read())
219 219 if d['exec_key']:
220 config.Session.key = ensure_bytes(d['exec_key'])
220 config.Session.key = asbytes(d['exec_key'])
221 221 d['url'] = disambiguate_url(d['url'], d['location'])
222 222 config.EngineFactory.url = d['url']
223 223 config.EngineFactory.location = d['location']
@@ -50,7 +50,7 b' from IPython.core.profiledir import ProfileDir, ProfileDirError'
50 50 from .view import DirectView, LoadBalancedView
51 51
52 52 if sys.version_info[0] >= 3:
53 # xrange is used in a coupe 'isinstance' tests in py2
53 # xrange is used in a couple 'isinstance' tests in py2
54 54 # should be just 'range' in 3k
55 55 xrange = range
56 56
@@ -362,12 +362,12 b' class Client(HasTraits):'
362 362 if os.path.isfile(exec_key):
363 363 extra_args['keyfile'] = exec_key
364 364 else:
365 exec_key = util.ensure_bytes(exec_key)
365 exec_key = util.asbytes(exec_key)
366 366 extra_args['key'] = exec_key
367 367 self.session = Session(**extra_args)
368 368
369 369 self._query_socket = self._context.socket(zmq.XREQ)
370 self._query_socket.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session))
370 self._query_socket.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
371 371 if self._ssh:
372 372 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
373 373 else:
@@ -460,7 +460,7 b' class Client(HasTraits):'
460 460 if not isinstance(targets, (tuple, list, xrange)):
461 461 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
462 462
463 return [util.ensure_bytes(self._engines[t]) for t in targets], list(targets)
463 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
464 464
465 465 def _connect(self, sshserver, ssh_kwargs, timeout):
466 466 """setup all our socket connections to the cluster. This is called from
@@ -493,7 +493,7 b' class Client(HasTraits):'
493 493 content = msg.content
494 494 self._config['registration'] = dict(content)
495 495 if content.status == 'ok':
496 ident = util.ensure_bytes(self.session.session)
496 ident = util.asbytes(self.session.session)
497 497 if content.mux:
498 498 self._mux_socket = self._context.socket(zmq.XREQ)
499 499 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
@@ -25,7 +25,7 b' from zmq.eventloop import ioloop, zmqstream'
25 25 from IPython.config.configurable import LoggingConfigurable
26 26 from IPython.utils.traitlets import Set, Instance, CFloat
27 27
28 from IPython.parallel.util import ensure_bytes
28 from IPython.parallel.util import asbytes
29 29
30 30 class Heart(object):
31 31 """A basic heart object for responding to a HeartMonitor.
@@ -117,7 +117,7 b' class HeartMonitor(LoggingConfigurable):'
117 117 self.responses = set()
118 118 # print self.on_probation, self.hearts
119 119 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
120 self.pingstream.send(ensure_bytes(str(self.lifetime)))
120 self.pingstream.send(asbytes(str(self.lifetime)))
121 121
122 122 def handle_new_heart(self, heart):
123 123 if self._new_handlers:
@@ -142,8 +142,8 b' class HeartMonitor(LoggingConfigurable):'
142 142
143 143 def handle_pong(self, msg):
144 144 "a heart just beat"
145 current = ensure_bytes(str(self.lifetime))
146 last = ensure_bytes(str(self.last_ping))
145 current = asbytes(str(self.lifetime))
146 last = asbytes(str(self.last_ping))
147 147 if msg[1] == current:
148 148 delta = time.time()-self.tic
149 149 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
@@ -289,7 +289,7 b' class HubFactory(RegistrationFactory):'
289 289 # resubmit stream
290 290 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
291 291 url = util.disambiguate_url(self.client_info['task'][-1])
292 r.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session))
292 r.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
293 293 r.connect(url)
294 294
295 295 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -752,7 +752,7 b' class Hub(SessionFactory):'
752 752 # print (content)
753 753 msg_id = content['msg_id']
754 754 engine_uuid = content['engine_id']
755 eid = self.by_ident[util.ensure_bytes(engine_uuid)]
755 eid = self.by_ident[util.asbytes(engine_uuid)]
756 756
757 757 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
758 758 if msg_id in self.unassigned:
@@ -842,13 +842,13 b' class Hub(SessionFactory):'
842 842 """Register a new engine."""
843 843 content = msg['content']
844 844 try:
845 queue = util.ensure_bytes(content['queue'])
845 queue = util.asbytes(content['queue'])
846 846 except KeyError:
847 847 self.log.error("registration::queue not specified", exc_info=True)
848 848 return
849 849 heart = content.get('heartbeat', None)
850 850 if heart:
851 heart = util.ensure_bytes(heart)
851 heart = util.asbytes(heart)
852 852 """register a new engine, and create the socket(s) necessary"""
853 853 eid = self._next_id
854 854 # print (eid, queue, reg, heart)
@@ -915,7 +915,7 b' class Hub(SessionFactory):'
915 915 self.log.info("registration::unregister_engine(%r)"%eid)
916 916 # print (eid)
917 917 uuid = self.keytable[eid]
918 content=dict(id=eid, queue=uuid.decode())
918 content=dict(id=eid, queue=uuid.decode('ascii'))
919 919 self.dead_engines.add(uuid)
920 920 # self.ids.remove(eid)
921 921 # uuid = self.keytable.pop(eid)
@@ -983,7 +983,7 b' class Hub(SessionFactory):'
983 983 self.tasks[eid] = list()
984 984 self.completed[eid] = list()
985 985 self.hearts[heart] = eid
986 content = dict(id=eid, queue=self.engines[eid].queue.decode())
986 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
987 987 if self.notifier:
988 988 self.session.send(self.notifier, "registration_notification", content=content)
989 989 self.log.info("engine::Engine Connected: %i"%eid)
@@ -44,7 +44,7 b' from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes'
44 44
45 45 from IPython.parallel import error
46 46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger, ensure_bytes
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
48 48
49 49 from .dependency import Dependency
50 50
@@ -177,7 +177,7 b' class TaskScheduler(SessionFactory):'
177 177 ident = CBytes() # ZMQ identity. This should just be self.session.session
178 178 # but ensure Bytes
179 179 def _ident_default(self):
180 return ensure_bytes(self.session.session)
180 return asbytes(self.session.session)
181 181
182 182 def start(self):
183 183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
@@ -223,7 +223,7 b' class TaskScheduler(SessionFactory):'
223 223 self.log.error("Unhandled message type: %r"%msg_type)
224 224 else:
225 225 try:
226 handler(ensure_bytes(msg['content']['queue']))
226 handler(asbytes(msg['content']['queue']))
227 227 except Exception:
228 228 self.log.error("task::Invalid notification msg: %r",msg)
229 229
@@ -323,7 +323,7 b' class TaskScheduler(SessionFactory):'
323 323 # get targets as a set of bytes objects
324 324 # from a list of unicode objects
325 325 targets = header.get('targets', [])
326 targets = map(ensure_bytes, targets)
326 targets = map(asbytes, targets)
327 327 targets = set(targets)
328 328
329 329 retries = header.get('retries', 0)
@@ -63,8 +63,7 b' def _convert_dict(ds):'
63 63 if isinstance(ds, bytes):
64 64 # If I understand the sqlite doc correctly, this will always be utf8
65 65 ds = ds.decode('utf8')
66 d = json.loads(ds)
67 return extract_dates(d)
66 return extract_dates(json.loads(ds))
68 67
69 68 def _adapt_bufs(bufs):
70 69 # this is *horrible*
@@ -28,7 +28,7 b' from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, '
28 28
29 29 from IPython.parallel.controller.heartmonitor import Heart
30 30 from IPython.parallel.factory import RegistrationFactory
31 from IPython.parallel.util import disambiguate_url, ensure_bytes
31 from IPython.parallel.util import disambiguate_url, asbytes
32 32
33 33 from IPython.zmq.session import Message
34 34
@@ -61,7 +61,7 b' class EngineFactory(RegistrationFactory):'
61 61 bident = CBytes()
62 62 ident = Unicode()
63 63 def _ident_changed(self, name, old, new):
64 self.bident = ensure_bytes(new)
64 self.bident = asbytes(new)
65 65
66 66
67 67 def __init__(self, **kwargs):
@@ -40,7 +40,7 b' from IPython.zmq.completer import KernelCompleter'
40 40
41 41 from IPython.parallel.error import wrap_exception
42 42 from IPython.parallel.factory import SessionFactory
43 from IPython.parallel.util import serialize_object, unpack_apply_message, ensure_bytes
43 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
44 44
45 45 def printer(*args):
46 46 pprint(args, stream=sys.__stdout__)
@@ -79,7 +79,7 b' class Kernel(SessionFactory):'
79 79 bident = CBytes()
80 80 ident = Unicode()
81 81 def _ident_changed(self, name, old, new):
82 self.bident = ensure_bytes(new)
82 self.bident = asbytes(new)
83 83
84 84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
85 85
@@ -255,7 +255,7 b' class Kernel(SessionFactory):'
255 255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
256 256 return
257 257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
258 ident=ensure_bytes('%s.pyin'%self.prefix))
258 ident=asbytes('%s.pyin'%self.prefix))
259 259 started = datetime.now()
260 260 try:
261 261 comp_code = self.compiler(code, '<zmq-kernel>')
@@ -269,7 +269,7 b' class Kernel(SessionFactory):'
269 269 exc_content = self._wrap_exception('execute')
270 270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
271 271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
272 ident=ensure_bytes('%s.pyerr'%self.prefix))
272 ident=asbytes('%s.pyerr'%self.prefix))
273 273 reply_content = exc_content
274 274 else:
275 275 reply_content = {'status' : 'ok'}
@@ -348,7 +348,7 b' class Kernel(SessionFactory):'
348 348 exc_content = self._wrap_exception('apply')
349 349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
350 350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
351 ident=ensure_bytes('%s.pyerr'%self.prefix))
351 ident=asbytes('%s.pyerr'%self.prefix))
352 352 reply_content = exc_content
353 353 result_buf = []
354 354
@@ -43,6 +43,7 b' def crash():'
43 43 ctypes.windll.kernel32.SetErrorMode(0x0002);
44 44 args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b'']
45 45 if sys.version_info[0] >= 3:
46 # Python3 adds 'kwonlyargcount' as the second argument to Code
46 47 args.insert(1, 0)
47 48
48 49 co = types.CodeType(*args)
@@ -101,7 +101,7 b' class ReverseDict(dict):'
101 101 # Functions
102 102 #-----------------------------------------------------------------------------
103 103
104 def ensure_bytes(s):
104 def asbytes(s):
105 105 """ensure that an object is ascii bytes"""
106 106 if isinstance(s, unicode):
107 107 s = s.encode('ascii')
@@ -24,10 +24,10 b' __docformat__ = "restructuredtext en"'
24 24 #-------------------------------------------------------------------------------
25 25
26 26 import sys
27 import new, types, copy_reg
27 import types, copy_reg
28 28
29 29 def code_ctor(*args):
30 return new.code(*args)
30 return types.CodeType(*args)
31 31
32 32 def reduce_code(co):
33 33 if co.co_freevars or co.co_cellvars:
General Comments 0
You need to be logged in to leave comments. Login now