##// END OF EJS Templates
cleanup per review...
MinRK -
Show More
@@ -54,7 +54,7 b' from IPython.parallel.controller.hub import HubFactory'
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56
56
57 from IPython.parallel.util import signal_children, split_url, ensure_bytes
57 from IPython.parallel.util import signal_children, split_url, asbytes
58
58
59 # conditional import of MongoDB backend class
59 # conditional import of MongoDB backend class
60
60
@@ -202,7 +202,7 b' class IPControllerApp(BaseParallelApplication):'
202 # load from engine config
202 # load from engine config
203 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
203 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
204 cfg = json.loads(f.read())
204 cfg = json.loads(f.read())
205 key = c.Session.key = ensure_bytes(cfg['exec_key'])
205 key = c.Session.key = asbytes(cfg['exec_key'])
206 xport,addr = cfg['url'].split('://')
206 xport,addr = cfg['url'].split('://')
207 c.HubFactory.engine_transport = xport
207 c.HubFactory.engine_transport = xport
208 ip,ports = addr.split(':')
208 ip,ports = addr.split(':')
@@ -239,7 +239,7 b' class IPControllerApp(BaseParallelApplication):'
239 # with open(keyfile, 'w') as f:
239 # with open(keyfile, 'w') as f:
240 # f.write(key)
240 # f.write(key)
241 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
241 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
242 c.Session.key = ensure_bytes(key)
242 c.Session.key = asbytes(key)
243 else:
243 else:
244 key = c.Session.key = b''
244 key = c.Session.key = b''
245
245
@@ -41,7 +41,7 b' from IPython.config.configurable import Configurable'
41 from IPython.zmq.session import Session
41 from IPython.zmq.session import Session
42 from IPython.parallel.engine.engine import EngineFactory
42 from IPython.parallel.engine.engine import EngineFactory
43 from IPython.parallel.engine.streamkernel import Kernel
43 from IPython.parallel.engine.streamkernel import Kernel
44 from IPython.parallel.util import disambiguate_url, ensure_bytes
44 from IPython.parallel.util import disambiguate_url, asbytes
45
45
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
@@ -217,7 +217,7 b' class IPEngineApp(BaseParallelApplication):'
217 with open(self.url_file) as f:
217 with open(self.url_file) as f:
218 d = json.loads(f.read())
218 d = json.loads(f.read())
219 if d['exec_key']:
219 if d['exec_key']:
220 config.Session.key = ensure_bytes(d['exec_key'])
220 config.Session.key = asbytes(d['exec_key'])
221 d['url'] = disambiguate_url(d['url'], d['location'])
221 d['url'] = disambiguate_url(d['url'], d['location'])
222 config.EngineFactory.url = d['url']
222 config.EngineFactory.url = d['url']
223 config.EngineFactory.location = d['location']
223 config.EngineFactory.location = d['location']
@@ -50,7 +50,7 b' from IPython.core.profiledir import ProfileDir, ProfileDirError'
50 from .view import DirectView, LoadBalancedView
50 from .view import DirectView, LoadBalancedView
51
51
52 if sys.version_info[0] >= 3:
52 if sys.version_info[0] >= 3:
53 # xrange is used in a coupe 'isinstance' tests in py2
53 # xrange is used in a couple 'isinstance' tests in py2
54 # should be just 'range' in 3k
54 # should be just 'range' in 3k
55 xrange = range
55 xrange = range
56
56
@@ -362,12 +362,12 b' class Client(HasTraits):'
362 if os.path.isfile(exec_key):
362 if os.path.isfile(exec_key):
363 extra_args['keyfile'] = exec_key
363 extra_args['keyfile'] = exec_key
364 else:
364 else:
365 exec_key = util.ensure_bytes(exec_key)
365 exec_key = util.asbytes(exec_key)
366 extra_args['key'] = exec_key
366 extra_args['key'] = exec_key
367 self.session = Session(**extra_args)
367 self.session = Session(**extra_args)
368
368
369 self._query_socket = self._context.socket(zmq.XREQ)
369 self._query_socket = self._context.socket(zmq.XREQ)
370 self._query_socket.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session))
370 self._query_socket.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
371 if self._ssh:
371 if self._ssh:
372 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
372 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
373 else:
373 else:
@@ -460,7 +460,7 b' class Client(HasTraits):'
460 if not isinstance(targets, (tuple, list, xrange)):
460 if not isinstance(targets, (tuple, list, xrange)):
461 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
461 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
462
462
463 return [util.ensure_bytes(self._engines[t]) for t in targets], list(targets)
463 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
464
464
465 def _connect(self, sshserver, ssh_kwargs, timeout):
465 def _connect(self, sshserver, ssh_kwargs, timeout):
466 """setup all our socket connections to the cluster. This is called from
466 """setup all our socket connections to the cluster. This is called from
@@ -493,7 +493,7 b' class Client(HasTraits):'
493 content = msg.content
493 content = msg.content
494 self._config['registration'] = dict(content)
494 self._config['registration'] = dict(content)
495 if content.status == 'ok':
495 if content.status == 'ok':
496 ident = util.ensure_bytes(self.session.session)
496 ident = util.asbytes(self.session.session)
497 if content.mux:
497 if content.mux:
498 self._mux_socket = self._context.socket(zmq.XREQ)
498 self._mux_socket = self._context.socket(zmq.XREQ)
499 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
499 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
@@ -25,7 +25,7 b' from zmq.eventloop import ioloop, zmqstream'
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat
26 from IPython.utils.traitlets import Set, Instance, CFloat
27
27
28 from IPython.parallel.util import ensure_bytes
28 from IPython.parallel.util import asbytes
29
29
30 class Heart(object):
30 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
31 """A basic heart object for responding to a HeartMonitor.
@@ -117,7 +117,7 b' class HeartMonitor(LoggingConfigurable):'
117 self.responses = set()
117 self.responses = set()
118 # print self.on_probation, self.hearts
118 # print self.on_probation, self.hearts
119 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
119 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
120 self.pingstream.send(ensure_bytes(str(self.lifetime)))
120 self.pingstream.send(asbytes(str(self.lifetime)))
121
121
122 def handle_new_heart(self, heart):
122 def handle_new_heart(self, heart):
123 if self._new_handlers:
123 if self._new_handlers:
@@ -142,8 +142,8 b' class HeartMonitor(LoggingConfigurable):'
142
142
143 def handle_pong(self, msg):
143 def handle_pong(self, msg):
144 "a heart just beat"
144 "a heart just beat"
145 current = ensure_bytes(str(self.lifetime))
145 current = asbytes(str(self.lifetime))
146 last = ensure_bytes(str(self.last_ping))
146 last = asbytes(str(self.last_ping))
147 if msg[1] == current:
147 if msg[1] == current:
148 delta = time.time()-self.tic
148 delta = time.time()-self.tic
149 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
149 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
@@ -289,7 +289,7 b' class HubFactory(RegistrationFactory):'
289 # resubmit stream
289 # resubmit stream
290 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
290 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
291 url = util.disambiguate_url(self.client_info['task'][-1])
291 url = util.disambiguate_url(self.client_info['task'][-1])
292 r.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session))
292 r.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
293 r.connect(url)
293 r.connect(url)
294
294
295 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
295 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -752,7 +752,7 b' class Hub(SessionFactory):'
752 # print (content)
752 # print (content)
753 msg_id = content['msg_id']
753 msg_id = content['msg_id']
754 engine_uuid = content['engine_id']
754 engine_uuid = content['engine_id']
755 eid = self.by_ident[util.ensure_bytes(engine_uuid)]
755 eid = self.by_ident[util.asbytes(engine_uuid)]
756
756
757 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
757 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
758 if msg_id in self.unassigned:
758 if msg_id in self.unassigned:
@@ -842,13 +842,13 b' class Hub(SessionFactory):'
842 """Register a new engine."""
842 """Register a new engine."""
843 content = msg['content']
843 content = msg['content']
844 try:
844 try:
845 queue = util.ensure_bytes(content['queue'])
845 queue = util.asbytes(content['queue'])
846 except KeyError:
846 except KeyError:
847 self.log.error("registration::queue not specified", exc_info=True)
847 self.log.error("registration::queue not specified", exc_info=True)
848 return
848 return
849 heart = content.get('heartbeat', None)
849 heart = content.get('heartbeat', None)
850 if heart:
850 if heart:
851 heart = util.ensure_bytes(heart)
851 heart = util.asbytes(heart)
852 """register a new engine, and create the socket(s) necessary"""
852 """register a new engine, and create the socket(s) necessary"""
853 eid = self._next_id
853 eid = self._next_id
854 # print (eid, queue, reg, heart)
854 # print (eid, queue, reg, heart)
@@ -915,7 +915,7 b' class Hub(SessionFactory):'
915 self.log.info("registration::unregister_engine(%r)"%eid)
915 self.log.info("registration::unregister_engine(%r)"%eid)
916 # print (eid)
916 # print (eid)
917 uuid = self.keytable[eid]
917 uuid = self.keytable[eid]
918 content=dict(id=eid, queue=uuid.decode())
918 content=dict(id=eid, queue=uuid.decode('ascii'))
919 self.dead_engines.add(uuid)
919 self.dead_engines.add(uuid)
920 # self.ids.remove(eid)
920 # self.ids.remove(eid)
921 # uuid = self.keytable.pop(eid)
921 # uuid = self.keytable.pop(eid)
@@ -983,7 +983,7 b' class Hub(SessionFactory):'
983 self.tasks[eid] = list()
983 self.tasks[eid] = list()
984 self.completed[eid] = list()
984 self.completed[eid] = list()
985 self.hearts[heart] = eid
985 self.hearts[heart] = eid
986 content = dict(id=eid, queue=self.engines[eid].queue.decode())
986 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
987 if self.notifier:
987 if self.notifier:
988 self.session.send(self.notifier, "registration_notification", content=content)
988 self.session.send(self.notifier, "registration_notification", content=content)
989 self.log.info("engine::Engine Connected: %i"%eid)
989 self.log.info("engine::Engine Connected: %i"%eid)
@@ -44,7 +44,7 b' from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes'
44
44
45 from IPython.parallel import error
45 from IPython.parallel import error
46 from IPython.parallel.factory import SessionFactory
46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger, ensure_bytes
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
48
48
49 from .dependency import Dependency
49 from .dependency import Dependency
50
50
@@ -177,7 +177,7 b' class TaskScheduler(SessionFactory):'
177 ident = CBytes() # ZMQ identity. This should just be self.session.session
177 ident = CBytes() # ZMQ identity. This should just be self.session.session
178 # but ensure Bytes
178 # but ensure Bytes
179 def _ident_default(self):
179 def _ident_default(self):
180 return ensure_bytes(self.session.session)
180 return asbytes(self.session.session)
181
181
182 def start(self):
182 def start(self):
183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
@@ -223,7 +223,7 b' class TaskScheduler(SessionFactory):'
223 self.log.error("Unhandled message type: %r"%msg_type)
223 self.log.error("Unhandled message type: %r"%msg_type)
224 else:
224 else:
225 try:
225 try:
226 handler(ensure_bytes(msg['content']['queue']))
226 handler(asbytes(msg['content']['queue']))
227 except Exception:
227 except Exception:
228 self.log.error("task::Invalid notification msg: %r",msg)
228 self.log.error("task::Invalid notification msg: %r",msg)
229
229
@@ -323,7 +323,7 b' class TaskScheduler(SessionFactory):'
323 # get targets as a set of bytes objects
323 # get targets as a set of bytes objects
324 # from a list of unicode objects
324 # from a list of unicode objects
325 targets = header.get('targets', [])
325 targets = header.get('targets', [])
326 targets = map(ensure_bytes, targets)
326 targets = map(asbytes, targets)
327 targets = set(targets)
327 targets = set(targets)
328
328
329 retries = header.get('retries', 0)
329 retries = header.get('retries', 0)
@@ -63,8 +63,7 b' def _convert_dict(ds):'
63 if isinstance(ds, bytes):
63 if isinstance(ds, bytes):
64 # If I understand the sqlite doc correctly, this will always be utf8
64 # If I understand the sqlite doc correctly, this will always be utf8
65 ds = ds.decode('utf8')
65 ds = ds.decode('utf8')
66 d = json.loads(ds)
66 return extract_dates(json.loads(ds))
67 return extract_dates(d)
68
67
69 def _adapt_bufs(bufs):
68 def _adapt_bufs(bufs):
70 # this is *horrible*
69 # this is *horrible*
@@ -28,7 +28,7 b' from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, '
28
28
29 from IPython.parallel.controller.heartmonitor import Heart
29 from IPython.parallel.controller.heartmonitor import Heart
30 from IPython.parallel.factory import RegistrationFactory
30 from IPython.parallel.factory import RegistrationFactory
31 from IPython.parallel.util import disambiguate_url, ensure_bytes
31 from IPython.parallel.util import disambiguate_url, asbytes
32
32
33 from IPython.zmq.session import Message
33 from IPython.zmq.session import Message
34
34
@@ -61,7 +61,7 b' class EngineFactory(RegistrationFactory):'
61 bident = CBytes()
61 bident = CBytes()
62 ident = Unicode()
62 ident = Unicode()
63 def _ident_changed(self, name, old, new):
63 def _ident_changed(self, name, old, new):
64 self.bident = ensure_bytes(new)
64 self.bident = asbytes(new)
65
65
66
66
67 def __init__(self, **kwargs):
67 def __init__(self, **kwargs):
@@ -40,7 +40,7 b' from IPython.zmq.completer import KernelCompleter'
40
40
41 from IPython.parallel.error import wrap_exception
41 from IPython.parallel.error import wrap_exception
42 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.factory import SessionFactory
43 from IPython.parallel.util import serialize_object, unpack_apply_message, ensure_bytes
43 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
44
44
45 def printer(*args):
45 def printer(*args):
46 pprint(args, stream=sys.__stdout__)
46 pprint(args, stream=sys.__stdout__)
@@ -79,7 +79,7 b' class Kernel(SessionFactory):'
79 bident = CBytes()
79 bident = CBytes()
80 ident = Unicode()
80 ident = Unicode()
81 def _ident_changed(self, name, old, new):
81 def _ident_changed(self, name, old, new):
82 self.bident = ensure_bytes(new)
82 self.bident = asbytes(new)
83
83
84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
85
85
@@ -255,7 +255,7 b' class Kernel(SessionFactory):'
255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
256 return
256 return
257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
258 ident=ensure_bytes('%s.pyin'%self.prefix))
258 ident=asbytes('%s.pyin'%self.prefix))
259 started = datetime.now()
259 started = datetime.now()
260 try:
260 try:
261 comp_code = self.compiler(code, '<zmq-kernel>')
261 comp_code = self.compiler(code, '<zmq-kernel>')
@@ -269,7 +269,7 b' class Kernel(SessionFactory):'
269 exc_content = self._wrap_exception('execute')
269 exc_content = self._wrap_exception('execute')
270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
272 ident=ensure_bytes('%s.pyerr'%self.prefix))
272 ident=asbytes('%s.pyerr'%self.prefix))
273 reply_content = exc_content
273 reply_content = exc_content
274 else:
274 else:
275 reply_content = {'status' : 'ok'}
275 reply_content = {'status' : 'ok'}
@@ -348,7 +348,7 b' class Kernel(SessionFactory):'
348 exc_content = self._wrap_exception('apply')
348 exc_content = self._wrap_exception('apply')
349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
351 ident=ensure_bytes('%s.pyerr'%self.prefix))
351 ident=asbytes('%s.pyerr'%self.prefix))
352 reply_content = exc_content
352 reply_content = exc_content
353 result_buf = []
353 result_buf = []
354
354
@@ -43,6 +43,7 b' def crash():'
43 ctypes.windll.kernel32.SetErrorMode(0x0002);
43 ctypes.windll.kernel32.SetErrorMode(0x0002);
44 args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b'']
44 args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b'']
45 if sys.version_info[0] >= 3:
45 if sys.version_info[0] >= 3:
46 # Python3 adds 'kwonlyargcount' as the second argument to Code
46 args.insert(1, 0)
47 args.insert(1, 0)
47
48
48 co = types.CodeType(*args)
49 co = types.CodeType(*args)
@@ -101,7 +101,7 b' class ReverseDict(dict):'
101 # Functions
101 # Functions
102 #-----------------------------------------------------------------------------
102 #-----------------------------------------------------------------------------
103
103
104 def ensure_bytes(s):
104 def asbytes(s):
105 """ensure that an object is ascii bytes"""
105 """ensure that an object is ascii bytes"""
106 if isinstance(s, unicode):
106 if isinstance(s, unicode):
107 s = s.encode('ascii')
107 s = s.encode('ascii')
@@ -24,10 +24,10 b' __docformat__ = "restructuredtext en"'
24 #-------------------------------------------------------------------------------
24 #-------------------------------------------------------------------------------
25
25
26 import sys
26 import sys
27 import new, types, copy_reg
27 import types, copy_reg
28
28
29 def code_ctor(*args):
29 def code_ctor(*args):
30 return new.code(*args)
30 return types.CodeType(*args)
31
31
32 def reduce_code(co):
32 def reduce_code(co):
33 if co.co_freevars or co.co_cellvars:
33 if co.co_freevars or co.co_cellvars:
General Comments 0
You need to be logged in to leave comments. Login now