##// END OF EJS Templates
update parallel code for py3k...
MinRK -
Show More
@@ -54,7 +54,7 b' from IPython.parallel.controller.hub import HubFactory'
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56
56
57 from IPython.parallel.util import signal_children, split_url
57 from IPython.parallel.util import signal_children, split_url, ensure_bytes
58
58
59 # conditional import of MongoDB backend class
59 # conditional import of MongoDB backend class
60
60
@@ -202,14 +202,13 b' class IPControllerApp(BaseParallelApplication):'
202 # load from engine config
202 # load from engine config
203 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
203 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
204 cfg = json.loads(f.read())
204 cfg = json.loads(f.read())
205 key = c.Session.key = cfg['exec_key']
205 key = c.Session.key = ensure_bytes(cfg['exec_key'])
206 xport,addr = cfg['url'].split('://')
206 xport,addr = cfg['url'].split('://')
207 c.HubFactory.engine_transport = xport
207 c.HubFactory.engine_transport = xport
208 ip,ports = addr.split(':')
208 ip,ports = addr.split(':')
209 c.HubFactory.engine_ip = ip
209 c.HubFactory.engine_ip = ip
210 c.HubFactory.regport = int(ports)
210 c.HubFactory.regport = int(ports)
211 self.location = cfg['location']
211 self.location = cfg['location']
212
213 # load client config
212 # load client config
214 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
213 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
215 cfg = json.loads(f.read())
214 cfg = json.loads(f.read())
@@ -240,9 +239,9 b' class IPControllerApp(BaseParallelApplication):'
240 # with open(keyfile, 'w') as f:
239 # with open(keyfile, 'w') as f:
241 # f.write(key)
240 # f.write(key)
242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
241 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 c.Session.key = key
242 c.Session.key = ensure_bytes(key)
244 else:
243 else:
245 key = c.Session.key = ''
244 key = c.Session.key = b''
246
245
247 try:
246 try:
248 self.factory = HubFactory(config=c, log=self.log)
247 self.factory = HubFactory(config=c, log=self.log)
@@ -273,27 +272,27 b' class IPControllerApp(BaseParallelApplication):'
273 hub = self.factory
272 hub = self.factory
274 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
273 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
275 # IOPub relay (in a Process)
274 # IOPub relay (in a Process)
276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
275 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
277 q.bind_in(hub.client_info['iopub'])
276 q.bind_in(hub.client_info['iopub'])
278 q.bind_out(hub.engine_info['iopub'])
277 q.bind_out(hub.engine_info['iopub'])
279 q.setsockopt_out(zmq.SUBSCRIBE, '')
278 q.setsockopt_out(zmq.SUBSCRIBE, b'')
280 q.connect_mon(hub.monitor_url)
279 q.connect_mon(hub.monitor_url)
281 q.daemon=True
280 q.daemon=True
282 children.append(q)
281 children.append(q)
283
282
284 # Multiplexer Queue (in a Process)
283 # Multiplexer Queue (in a Process)
285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
284 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out')
286 q.bind_in(hub.client_info['mux'])
285 q.bind_in(hub.client_info['mux'])
287 q.setsockopt_in(zmq.IDENTITY, 'mux')
286 q.setsockopt_in(zmq.IDENTITY, b'mux')
288 q.bind_out(hub.engine_info['mux'])
287 q.bind_out(hub.engine_info['mux'])
289 q.connect_mon(hub.monitor_url)
288 q.connect_mon(hub.monitor_url)
290 q.daemon=True
289 q.daemon=True
291 children.append(q)
290 children.append(q)
292
291
293 # Control Queue (in a Process)
292 # Control Queue (in a Process)
294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
293 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol')
295 q.bind_in(hub.client_info['control'])
294 q.bind_in(hub.client_info['control'])
296 q.setsockopt_in(zmq.IDENTITY, 'control')
295 q.setsockopt_in(zmq.IDENTITY, b'control')
297 q.bind_out(hub.engine_info['control'])
296 q.bind_out(hub.engine_info['control'])
298 q.connect_mon(hub.monitor_url)
297 q.connect_mon(hub.monitor_url)
299 q.daemon=True
298 q.daemon=True
@@ -305,10 +304,10 b' class IPControllerApp(BaseParallelApplication):'
305 # Task Queue (in a Process)
304 # Task Queue (in a Process)
306 if scheme == 'pure':
305 if scheme == 'pure':
307 self.log.warn("task::using pure XREQ Task scheduler")
306 self.log.warn("task::using pure XREQ Task scheduler")
308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
307 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask')
309 # q.setsockopt_out(zmq.HWM, hub.hwm)
308 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 q.bind_in(hub.client_info['task'][1])
309 q.bind_in(hub.client_info['task'][1])
311 q.setsockopt_in(zmq.IDENTITY, 'task')
310 q.setsockopt_in(zmq.IDENTITY, b'task')
312 q.bind_out(hub.engine_info['task'])
311 q.bind_out(hub.engine_info['task'])
313 q.connect_mon(hub.monitor_url)
312 q.connect_mon(hub.monitor_url)
314 q.daemon=True
313 q.daemon=True
@@ -41,7 +41,7 b' from IPython.config.configurable import Configurable'
41 from IPython.zmq.session import Session
41 from IPython.zmq.session import Session
42 from IPython.parallel.engine.engine import EngineFactory
42 from IPython.parallel.engine.engine import EngineFactory
43 from IPython.parallel.engine.streamkernel import Kernel
43 from IPython.parallel.engine.streamkernel import Kernel
44 from IPython.parallel.util import disambiguate_url
44 from IPython.parallel.util import disambiguate_url, ensure_bytes
45
45
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
@@ -216,11 +216,8 b' class IPEngineApp(BaseParallelApplication):'
216 self.log.info("Loading url_file %r"%self.url_file)
216 self.log.info("Loading url_file %r"%self.url_file)
217 with open(self.url_file) as f:
217 with open(self.url_file) as f:
218 d = json.loads(f.read())
218 d = json.loads(f.read())
219 for k,v in d.iteritems():
220 if isinstance(v, unicode):
221 d[k] = v.encode()
222 if d['exec_key']:
219 if d['exec_key']:
223 config.Session.key = d['exec_key']
220 config.Session.key = ensure_bytes(d['exec_key'])
224 d['url'] = disambiguate_url(d['url'], d['location'])
221 d['url'] = disambiguate_url(d['url'], d['location'])
225 config.EngineFactory.url = d['url']
222 config.EngineFactory.url = d['url']
226 config.EngineFactory.location = d['location']
223 config.EngineFactory.location = d['location']
@@ -17,6 +17,7 b' Authors:'
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import time
21 import time
21 import warnings
22 import warnings
22 from datetime import datetime
23 from datetime import datetime
@@ -48,6 +49,11 b' from .asyncresult import AsyncResult, AsyncHubResult'
48 from IPython.core.profiledir import ProfileDir, ProfileDirError
49 from IPython.core.profiledir import ProfileDir, ProfileDirError
49 from .view import DirectView, LoadBalancedView
50 from .view import DirectView, LoadBalancedView
50
51
52 if sys.version_info[0] >= 3:
53 # xrange is used in a coupe 'isinstance' tests in py2
54 # should be just 'range' in 3k
55 xrange = range
56
51 #--------------------------------------------------------------------------
57 #--------------------------------------------------------------------------
52 # Decorators for Client methods
58 # Decorators for Client methods
53 #--------------------------------------------------------------------------
59 #--------------------------------------------------------------------------
@@ -356,13 +362,12 b' class Client(HasTraits):'
356 if os.path.isfile(exec_key):
362 if os.path.isfile(exec_key):
357 extra_args['keyfile'] = exec_key
363 extra_args['keyfile'] = exec_key
358 else:
364 else:
359 if isinstance(exec_key, unicode):
365 exec_key = util.ensure_bytes(exec_key)
360 exec_key = exec_key.encode('ascii')
361 extra_args['key'] = exec_key
366 extra_args['key'] = exec_key
362 self.session = Session(**extra_args)
367 self.session = Session(**extra_args)
363
368
364 self._query_socket = self._context.socket(zmq.XREQ)
369 self._query_socket = self._context.socket(zmq.XREQ)
365 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
370 self._query_socket.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session))
366 if self._ssh:
371 if self._ssh:
367 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
372 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
368 else:
373 else:
@@ -404,7 +409,7 b' class Client(HasTraits):'
404 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
409 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
405 for k,v in engines.iteritems():
410 for k,v in engines.iteritems():
406 eid = int(k)
411 eid = int(k)
407 self._engines[eid] = bytes(v) # force not unicode
412 self._engines[eid] = v
408 self._ids.append(eid)
413 self._ids.append(eid)
409 self._ids = sorted(self._ids)
414 self._ids = sorted(self._ids)
410 if sorted(self._engines.keys()) != range(len(self._engines)) and \
415 if sorted(self._engines.keys()) != range(len(self._engines)) and \
@@ -455,7 +460,7 b' class Client(HasTraits):'
455 if not isinstance(targets, (tuple, list, xrange)):
460 if not isinstance(targets, (tuple, list, xrange)):
456 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
461 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
457
462
458 return [self._engines[t] for t in targets], list(targets)
463 return [util.ensure_bytes(self._engines[t]) for t in targets], list(targets)
459
464
460 def _connect(self, sshserver, ssh_kwargs, timeout):
465 def _connect(self, sshserver, ssh_kwargs, timeout):
461 """setup all our socket connections to the cluster. This is called from
466 """setup all our socket connections to the cluster. This is called from
@@ -488,14 +493,15 b' class Client(HasTraits):'
488 content = msg.content
493 content = msg.content
489 self._config['registration'] = dict(content)
494 self._config['registration'] = dict(content)
490 if content.status == 'ok':
495 if content.status == 'ok':
496 ident = util.ensure_bytes(self.session.session)
491 if content.mux:
497 if content.mux:
492 self._mux_socket = self._context.socket(zmq.XREQ)
498 self._mux_socket = self._context.socket(zmq.XREQ)
493 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
499 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
494 connect_socket(self._mux_socket, content.mux)
500 connect_socket(self._mux_socket, content.mux)
495 if content.task:
501 if content.task:
496 self._task_scheme, task_addr = content.task
502 self._task_scheme, task_addr = content.task
497 self._task_socket = self._context.socket(zmq.XREQ)
503 self._task_socket = self._context.socket(zmq.XREQ)
498 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
504 self._task_socket.setsockopt(zmq.IDENTITY, ident)
499 connect_socket(self._task_socket, task_addr)
505 connect_socket(self._task_socket, task_addr)
500 if content.notification:
506 if content.notification:
501 self._notification_socket = self._context.socket(zmq.SUB)
507 self._notification_socket = self._context.socket(zmq.SUB)
@@ -507,12 +513,12 b' class Client(HasTraits):'
507 # connect_socket(self._query_socket, content.query)
513 # connect_socket(self._query_socket, content.query)
508 if content.control:
514 if content.control:
509 self._control_socket = self._context.socket(zmq.XREQ)
515 self._control_socket = self._context.socket(zmq.XREQ)
510 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
516 self._control_socket.setsockopt(zmq.IDENTITY, ident)
511 connect_socket(self._control_socket, content.control)
517 connect_socket(self._control_socket, content.control)
512 if content.iopub:
518 if content.iopub:
513 self._iopub_socket = self._context.socket(zmq.SUB)
519 self._iopub_socket = self._context.socket(zmq.SUB)
514 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
520 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
515 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
521 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
516 connect_socket(self._iopub_socket, content.iopub)
522 connect_socket(self._iopub_socket, content.iopub)
517 self._update_engines(dict(content.engines))
523 self._update_engines(dict(content.engines))
518 else:
524 else:
@@ -13,8 +13,6 b' Authors:'
13
13
14 """
14 """
15
15
16 __docformat__ = "restructuredtext en"
17
18 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
19 # Copyright (C) 2008-2011 The IPython Development Team
17 # Copyright (C) 2008-2011 The IPython Development Team
20 #
18 #
@@ -26,6 +24,8 b' __docformat__ = "restructuredtext en"'
26 # Imports
24 # Imports
27 #-------------------------------------------------------------------------------
25 #-------------------------------------------------------------------------------
28
26
27 from __future__ import division
28
29 import types
29 import types
30
30
31 from IPython.utils.data import flatten as utils_flatten
31 from IPython.utils.data import flatten as utils_flatten
@@ -67,7 +67,7 b' class Map:'
67 return
67 return
68
68
69 remainder = len(seq)%q
69 remainder = len(seq)%q
70 basesize = len(seq)/q
70 basesize = len(seq)//q
71 hi = []
71 hi = []
72 lo = []
72 lo = []
73 for n in range(q):
73 for n in range(q):
@@ -16,6 +16,8 b' Authors:'
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from __future__ import division
20
19 import warnings
21 import warnings
20
22
21 from IPython.testing.skipdoctest import skip_doctest
23 from IPython.testing.skipdoctest import skip_doctest
@@ -142,7 +144,7 b' class ParallelFunction(RemoteFunction):'
142 balanced = 'Balanced' in self.view.__class__.__name__
144 balanced = 'Balanced' in self.view.__class__.__name__
143 if balanced:
145 if balanced:
144 if self.chunksize:
146 if self.chunksize:
145 nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0)
147 nparts = len_0//self.chunksize + int(len_0%self.chunksize > 0)
146 else:
148 else:
147 nparts = len_0
149 nparts = len_0
148 targets = [None]*nparts
150 targets = [None]*nparts
@@ -169,7 +171,10 b' class ParallelFunction(RemoteFunction):'
169
171
170 # print (args)
172 # print (args)
171 if hasattr(self, '_map'):
173 if hasattr(self, '_map'):
172 f = map
174 if sys.version_info[0] >= 3:
175 f = lambda f, *sequences: list(map(f, *sequences))
176 else:
177 f = map
173 args = [self.func]+args
178 args = [self.func]+args
174 else:
179 else:
175 f=self.func
180 f=self.func
@@ -969,6 +969,8 b' class LoadBalancedView(View):'
969 idents = []
969 idents = []
970 else:
970 else:
971 idents = self.client._build_targets(targets)[0]
971 idents = self.client._build_targets(targets)[0]
972 # ensure *not* bytes
973 idents = [ ident.decode() for ident in idents ]
972
974
973 after = self._render_dependency(after)
975 after = self._render_dependency(after)
974 follow = self._render_dependency(follow)
976 follow = self._render_dependency(follow)
@@ -25,6 +25,8 b' from zmq.eventloop import ioloop, zmqstream'
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat
26 from IPython.utils.traitlets import Set, Instance, CFloat
27
27
28 from IPython.parallel.util import ensure_bytes
29
28 class Heart(object):
30 class Heart(object):
29 """A basic heart object for responding to a HeartMonitor.
31 """A basic heart object for responding to a HeartMonitor.
30 This is a simple wrapper with defaults for the most common
32 This is a simple wrapper with defaults for the most common
@@ -42,9 +44,9 b' class Heart(object):'
42 self.device.connect_in(in_addr)
44 self.device.connect_in(in_addr)
43 self.device.connect_out(out_addr)
45 self.device.connect_out(out_addr)
44 if in_type == zmq.SUB:
46 if in_type == zmq.SUB:
45 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
46 if heart_id is None:
48 if heart_id is None:
47 heart_id = str(uuid.uuid4())
49 heart_id = ensure_bytes(uuid.uuid4())
48 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
49 self.id = heart_id
51 self.id = heart_id
50
52
@@ -115,7 +117,7 b' class HeartMonitor(LoggingConfigurable):'
115 self.responses = set()
117 self.responses = set()
116 # print self.on_probation, self.hearts
118 # print self.on_probation, self.hearts
117 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
119 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
118 self.pingstream.send(str(self.lifetime))
120 self.pingstream.send(ensure_bytes(str(self.lifetime)))
119
121
120 def handle_new_heart(self, heart):
122 def handle_new_heart(self, heart):
121 if self._new_handlers:
123 if self._new_handlers:
@@ -140,11 +142,13 b' class HeartMonitor(LoggingConfigurable):'
140
142
141 def handle_pong(self, msg):
143 def handle_pong(self, msg):
142 "a heart just beat"
144 "a heart just beat"
143 if msg[1] == str(self.lifetime):
145 current = ensure_bytes(str(self.lifetime))
146 last = ensure_bytes(str(self.last_ping))
147 if msg[1] == current:
144 delta = time.time()-self.tic
148 delta = time.time()-self.tic
145 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
149 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
146 self.responses.add(msg[0])
150 self.responses.add(msg[0])
147 elif msg[1] == str(self.last_ping):
151 elif msg[1] == last:
148 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
152 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
149 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
153 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
150 self.responses.add(msg[0])
154 self.responses.add(msg[0])
@@ -289,7 +289,7 b' class HubFactory(RegistrationFactory):'
289 # resubmit stream
289 # resubmit stream
290 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
290 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
291 url = util.disambiguate_url(self.client_info['task'][-1])
291 url = util.disambiguate_url(self.client_info['task'][-1])
292 r.setsockopt(zmq.IDENTITY, self.session.session)
292 r.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session))
293 r.connect(url)
293 r.connect(url)
294
294
295 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
295 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -380,14 +380,14 b' class Hub(SessionFactory):'
380 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
380 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
381 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
381 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
382
382
383 self.monitor_handlers = { 'in' : self.save_queue_request,
383 self.monitor_handlers = {b'in' : self.save_queue_request,
384 'out': self.save_queue_result,
384 b'out': self.save_queue_result,
385 'intask': self.save_task_request,
385 b'intask': self.save_task_request,
386 'outtask': self.save_task_result,
386 b'outtask': self.save_task_result,
387 'tracktask': self.save_task_destination,
387 b'tracktask': self.save_task_destination,
388 'incontrol': _passer,
388 b'incontrol': _passer,
389 'outcontrol': _passer,
389 b'outcontrol': _passer,
390 'iopub': self.save_iopub_message,
390 b'iopub': self.save_iopub_message,
391 }
391 }
392
392
393 self.query_handlers = {'queue_request': self.queue_status,
393 self.query_handlers = {'queue_request': self.queue_status,
@@ -562,8 +562,9 b' class Hub(SessionFactory):'
562 return
562 return
563 record = init_record(msg)
563 record = init_record(msg)
564 msg_id = record['msg_id']
564 msg_id = record['msg_id']
565 record['engine_uuid'] = queue_id
565 # Unicode in records
566 record['client_uuid'] = client_id
566 record['engine_uuid'] = queue_id.decode('utf8', 'replace')
567 record['client_uuid'] = client_id.decode('utf8', 'replace')
567 record['queue'] = 'mux'
568 record['queue'] = 'mux'
568
569
569 try:
570 try:
@@ -751,7 +752,7 b' class Hub(SessionFactory):'
751 # print (content)
752 # print (content)
752 msg_id = content['msg_id']
753 msg_id = content['msg_id']
753 engine_uuid = content['engine_id']
754 engine_uuid = content['engine_id']
754 eid = self.by_ident[engine_uuid]
755 eid = self.by_ident[util.ensure_bytes(engine_uuid)]
755
756
756 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
757 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
757 if msg_id in self.unassigned:
758 if msg_id in self.unassigned:
@@ -833,7 +834,7 b' class Hub(SessionFactory):'
833 jsonable = {}
834 jsonable = {}
834 for k,v in self.keytable.iteritems():
835 for k,v in self.keytable.iteritems():
835 if v not in self.dead_engines:
836 if v not in self.dead_engines:
836 jsonable[str(k)] = v
837 jsonable[str(k)] = v.decode()
837 content['engines'] = jsonable
838 content['engines'] = jsonable
838 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
839 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
839
840
@@ -841,11 +842,13 b' class Hub(SessionFactory):'
841 """Register a new engine."""
842 """Register a new engine."""
842 content = msg['content']
843 content = msg['content']
843 try:
844 try:
844 queue = content['queue']
845 queue = util.ensure_bytes(content['queue'])
845 except KeyError:
846 except KeyError:
846 self.log.error("registration::queue not specified", exc_info=True)
847 self.log.error("registration::queue not specified", exc_info=True)
847 return
848 return
848 heart = content.get('heartbeat', None)
849 heart = content.get('heartbeat', None)
850 if heart:
851 heart = util.ensure_bytes(heart)
849 """register a new engine, and create the socket(s) necessary"""
852 """register a new engine, and create the socket(s) necessary"""
850 eid = self._next_id
853 eid = self._next_id
851 # print (eid, queue, reg, heart)
854 # print (eid, queue, reg, heart)
@@ -912,7 +915,7 b' class Hub(SessionFactory):'
912 self.log.info("registration::unregister_engine(%r)"%eid)
915 self.log.info("registration::unregister_engine(%r)"%eid)
913 # print (eid)
916 # print (eid)
914 uuid = self.keytable[eid]
917 uuid = self.keytable[eid]
915 content=dict(id=eid, queue=uuid)
918 content=dict(id=eid, queue=uuid.decode())
916 self.dead_engines.add(uuid)
919 self.dead_engines.add(uuid)
917 # self.ids.remove(eid)
920 # self.ids.remove(eid)
918 # uuid = self.keytable.pop(eid)
921 # uuid = self.keytable.pop(eid)
@@ -980,7 +983,7 b' class Hub(SessionFactory):'
980 self.tasks[eid] = list()
983 self.tasks[eid] = list()
981 self.completed[eid] = list()
984 self.completed[eid] = list()
982 self.hearts[heart] = eid
985 self.hearts[heart] = eid
983 content = dict(id=eid, queue=self.engines[eid].queue)
986 content = dict(id=eid, queue=self.engines[eid].queue.decode())
984 if self.notifier:
987 if self.notifier:
985 self.session.send(self.notifier, "registration_notification", content=content)
988 self.session.send(self.notifier, "registration_notification", content=content)
986 self.log.info("engine::Engine Connected: %i"%eid)
989 self.log.info("engine::Engine Connected: %i"%eid)
@@ -1054,9 +1057,9 b' class Hub(SessionFactory):'
1054 queue = len(queue)
1057 queue = len(queue)
1055 completed = len(completed)
1058 completed = len(completed)
1056 tasks = len(tasks)
1059 tasks = len(tasks)
1057 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1060 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1058 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1061 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1059
1062 # print (content)
1060 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1063 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1061
1064
1062 def purge_results(self, client_id, msg):
1065 def purge_results(self, client_id, msg):
@@ -1179,7 +1182,7 b' class Hub(SessionFactory):'
1179 'io' : io_dict,
1182 'io' : io_dict,
1180 }
1183 }
1181 if rec['result_buffers']:
1184 if rec['result_buffers']:
1182 buffers = map(str, rec['result_buffers'])
1185 buffers = map(bytes, rec['result_buffers'])
1183 else:
1186 else:
1184 buffers = []
1187 buffers = []
1185
1188
@@ -1281,7 +1284,7 b' class Hub(SessionFactory):'
1281 buffers.extend(rb)
1284 buffers.extend(rb)
1282 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1285 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1283 result_buffer_lens=result_buffer_lens)
1286 result_buffer_lens=result_buffer_lens)
1284
1287 # self.log.debug (content)
1285 self.session.send(self.query, "db_reply", content=content,
1288 self.session.send(self.query, "db_reply", content=content,
1286 parent=msg, ident=client_id,
1289 parent=msg, ident=client_id,
1287 buffers=buffers)
1290 buffers=buffers)
@@ -40,11 +40,11 b' from zmq.eventloop import ioloop, zmqstream'
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes
44
44
45 from IPython.parallel import error
45 from IPython.parallel import error
46 from IPython.parallel.factory import SessionFactory
46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger
47 from IPython.parallel.util import connect_logger, local_logger, ensure_bytes
48
48
49 from .dependency import Dependency
49 from .dependency import Dependency
50
50
@@ -174,6 +174,10 b' class TaskScheduler(SessionFactory):'
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176
176
177 ident = CBytes() # ZMQ identity. This should just be self.session.session
178 # but ensure Bytes
179 def _ident_default(self):
180 return ensure_bytes(self.session.session)
177
181
178 def start(self):
182 def start(self):
179 self.engine_stream.on_recv(self.dispatch_result, copy=False)
183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
@@ -204,7 +208,7 b' class TaskScheduler(SessionFactory):'
204 try:
208 try:
205 idents,msg = self.session.feed_identities(msg)
209 idents,msg = self.session.feed_identities(msg)
206 except ValueError:
210 except ValueError:
207 self.log.warn("task::Invalid Message: %r"%msg)
211 self.log.warn("task::Invalid Message: %r",msg)
208 return
212 return
209 try:
213 try:
210 msg = self.session.unpack_message(msg)
214 msg = self.session.unpack_message(msg)
@@ -219,15 +223,16 b' class TaskScheduler(SessionFactory):'
219 self.log.error("Unhandled message type: %r"%msg_type)
223 self.log.error("Unhandled message type: %r"%msg_type)
220 else:
224 else:
221 try:
225 try:
222 handler(str(msg['content']['queue']))
226 handler(ensure_bytes(msg['content']['queue']))
223 except KeyError:
227 except Exception:
224 self.log.error("task::Invalid notification msg: %r"%msg)
228 self.log.error("task::Invalid notification msg: %r",msg)
225
229
226 def _register_engine(self, uid):
230 def _register_engine(self, uid):
227 """New engine with ident `uid` became available."""
231 """New engine with ident `uid` became available."""
228 # head of the line:
232 # head of the line:
229 self.targets.insert(0,uid)
233 self.targets.insert(0,uid)
230 self.loads.insert(0,0)
234 self.loads.insert(0,0)
235
231 # initialize sets
236 # initialize sets
232 self.completed[uid] = set()
237 self.completed[uid] = set()
233 self.failed[uid] = set()
238 self.failed[uid] = set()
@@ -309,14 +314,18 b' class TaskScheduler(SessionFactory):'
309
314
310
315
311 # send to monitor
316 # send to monitor
312 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
317 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
313
318
314 header = msg['header']
319 header = msg['header']
315 msg_id = header['msg_id']
320 msg_id = header['msg_id']
316 self.all_ids.add(msg_id)
321 self.all_ids.add(msg_id)
317
322
318 # targets
323 # get targets as a set of bytes objects
319 targets = set(header.get('targets', []))
324 # from a list of unicode objects
325 targets = header.get('targets', [])
326 targets = map(ensure_bytes, targets)
327 targets = set(targets)
328
320 retries = header.get('retries', 0)
329 retries = header.get('retries', 0)
321 self.retries[msg_id] = retries
330 self.retries[msg_id] = retries
322
331
@@ -412,7 +421,7 b' class TaskScheduler(SessionFactory):'
412
421
413 msg = self.session.send(self.client_stream, 'apply_reply', content,
422 msg = self.session.send(self.client_stream, 'apply_reply', content,
414 parent=header, ident=idents)
423 parent=header, ident=idents)
415 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
424 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
416
425
417 self.update_graph(msg_id, success=False)
426 self.update_graph(msg_id, success=False)
418
427
@@ -494,9 +503,9 b' class TaskScheduler(SessionFactory):'
494 self.add_job(idx)
503 self.add_job(idx)
495 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
504 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
496 # notify Hub
505 # notify Hub
497 content = dict(msg_id=msg_id, engine_id=target)
506 content = dict(msg_id=msg_id, engine_id=target.decode())
498 self.session.send(self.mon_stream, 'task_destination', content=content,
507 self.session.send(self.mon_stream, 'task_destination', content=content,
499 ident=['tracktask',self.session.session])
508 ident=[b'tracktask',self.ident])
500
509
501
510
502 #-----------------------------------------------------------------------
511 #-----------------------------------------------------------------------
@@ -533,7 +542,7 b' class TaskScheduler(SessionFactory):'
533 # relay to client and update graph
542 # relay to client and update graph
534 self.handle_result(idents, parent, raw_msg, success)
543 self.handle_result(idents, parent, raw_msg, success)
535 # send to Hub monitor
544 # send to Hub monitor
536 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
545 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
537 else:
546 else:
538 self.handle_unmet_dependency(idents, parent)
547 self.handle_unmet_dependency(idents, parent)
539
548
@@ -28,6 +28,12 b' from IPython.utils.jsonutil import date_default, extract_dates, squash_dates'
28 # SQLite operators, adapters, and converters
28 # SQLite operators, adapters, and converters
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30
30
31 try:
32 buffer
33 except NameError:
34 # py3k
35 buffer = memoryview
36
31 operators = {
37 operators = {
32 '$lt' : "<",
38 '$lt' : "<",
33 '$gt' : ">",
39 '$gt' : ">",
@@ -54,7 +60,11 b' def _convert_dict(ds):'
54 if ds is None:
60 if ds is None:
55 return ds
61 return ds
56 else:
62 else:
57 return extract_dates(json.loads(ds))
63 if isinstance(ds, bytes):
64 # If I understand the sqlite doc correctly, this will always be utf8
65 ds = ds.decode('utf8')
66 d = json.loads(ds)
67 return extract_dates(d)
58
68
59 def _adapt_bufs(bufs):
69 def _adapt_bufs(bufs):
60 # this is *horrible*
70 # this is *horrible*
@@ -28,7 +28,7 b' from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode'
28
28
29 from IPython.parallel.controller.heartmonitor import Heart
29 from IPython.parallel.controller.heartmonitor import Heart
30 from IPython.parallel.factory import RegistrationFactory
30 from IPython.parallel.factory import RegistrationFactory
31 from IPython.parallel.util import disambiguate_url
31 from IPython.parallel.util import disambiguate_url, ensure_bytes
32
32
33 from IPython.zmq.session import Message
33 from IPython.zmq.session import Message
34
34
@@ -65,7 +65,7 b' class EngineFactory(RegistrationFactory):'
65 ctx = self.context
65 ctx = self.context
66
66
67 reg = ctx.socket(zmq.XREQ)
67 reg = ctx.socket(zmq.XREQ)
68 reg.setsockopt(zmq.IDENTITY, self.ident)
68 reg.setsockopt(zmq.IDENTITY, ensure_bytes(self.ident))
69 reg.connect(self.url)
69 reg.connect(self.url)
70 self.registrar = zmqstream.ZMQStream(reg, self.loop)
70 self.registrar = zmqstream.ZMQStream(reg, self.loop)
71
71
@@ -83,7 +83,7 b' class EngineFactory(RegistrationFactory):'
83 self._abort_dc.stop()
83 self._abort_dc.stop()
84 ctx = self.context
84 ctx = self.context
85 loop = self.loop
85 loop = self.loop
86 identity = self.ident
86 identity = ensure_bytes(self.ident)
87
87
88 idents,msg = self.session.feed_identities(msg)
88 idents,msg = self.session.feed_identities(msg)
89 msg = Message(self.session.unpack_message(msg))
89 msg = Message(self.session.unpack_message(msg))
@@ -35,12 +35,12 b' import zmq'
35 from zmq.eventloop import ioloop, zmqstream
35 from zmq.eventloop import ioloop, zmqstream
36
36
37 # Local imports.
37 # Local imports.
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes
39 from IPython.zmq.completer import KernelCompleter
39 from IPython.zmq.completer import KernelCompleter
40
40
41 from IPython.parallel.error import wrap_exception
41 from IPython.parallel.error import wrap_exception
42 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.factory import SessionFactory
43 from IPython.parallel.util import serialize_object, unpack_apply_message
43 from IPython.parallel.util import serialize_object, unpack_apply_message, ensure_bytes
44
44
45 def printer(*args):
45 def printer(*args):
46 pprint(args, stream=sys.__stdout__)
46 pprint(args, stream=sys.__stdout__)
@@ -73,8 +73,14 b' class Kernel(SessionFactory):'
73 # kwargs:
73 # kwargs:
74 exec_lines = List(Unicode, config=True,
74 exec_lines = List(Unicode, config=True,
75 help="List of lines to execute")
75 help="List of lines to execute")
76
76
77 # identities:
77 int_id = Int(-1)
78 int_id = Int(-1)
79 bident = CBytes()
80 ident = Unicode()
81 def _ident_changed(self, name, old, new):
82 self.bident = ensure_bytes(new)
83
78 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
79
85
80 control_stream = Instance(zmqstream.ZMQStream)
86 control_stream = Instance(zmqstream.ZMQStream)
@@ -193,6 +199,8 b' class Kernel(SessionFactory):'
193 except:
199 except:
194 self.log.error("Invalid Message", exc_info=True)
200 self.log.error("Invalid Message", exc_info=True)
195 return
201 return
202 else:
203 self.log.debug("Control received, %s", msg)
196
204
197 header = msg['header']
205 header = msg['header']
198 msg_id = header['msg_id']
206 msg_id = header['msg_id']
@@ -247,7 +255,7 b' class Kernel(SessionFactory):'
247 self.log.error("Got bad msg: %s"%parent, exc_info=True)
255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
248 return
256 return
249 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
250 ident='%s.pyin'%self.prefix)
258 ident=ensure_bytes('%s.pyin'%self.prefix))
251 started = datetime.now()
259 started = datetime.now()
252 try:
260 try:
253 comp_code = self.compiler(code, '<zmq-kernel>')
261 comp_code = self.compiler(code, '<zmq-kernel>')
@@ -261,7 +269,7 b' class Kernel(SessionFactory):'
261 exc_content = self._wrap_exception('execute')
269 exc_content = self._wrap_exception('execute')
262 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
263 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
264 ident='%s.pyerr'%self.prefix)
272 ident=ensure_bytes('%s.pyerr'%self.prefix))
265 reply_content = exc_content
273 reply_content = exc_content
266 else:
274 else:
267 reply_content = {'status' : 'ok'}
275 reply_content = {'status' : 'ok'}
@@ -285,7 +293,6 b' class Kernel(SessionFactory):'
285 def apply_request(self, stream, ident, parent):
293 def apply_request(self, stream, ident, parent):
286 # flush previous reply, so this request won't block it
294 # flush previous reply, so this request won't block it
287 stream.flush(zmq.POLLOUT)
295 stream.flush(zmq.POLLOUT)
288
289 try:
296 try:
290 content = parent[u'content']
297 content = parent[u'content']
291 bufs = parent[u'buffers']
298 bufs = parent[u'buffers']
@@ -341,7 +348,7 b' class Kernel(SessionFactory):'
341 exc_content = self._wrap_exception('apply')
348 exc_content = self._wrap_exception('apply')
342 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
343 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
344 ident='%s.pyerr'%self.prefix)
351 ident=ensure_bytes('%s.pyerr'%self.prefix))
345 reply_content = exc_content
352 reply_content = exc_content
346 result_buf = []
353 result_buf = []
347
354
@@ -370,6 +377,8 b' class Kernel(SessionFactory):'
370 except:
377 except:
371 self.log.error("Invalid Message", exc_info=True)
378 self.log.error("Invalid Message", exc_info=True)
372 return
379 return
380 else:
381 self.log.debug("Message received, %s", msg)
373
382
374
383
375 header = msg['header']
384 header = msg['header']
@@ -41,9 +41,11 b' def crash():'
41 if sys.platform.startswith('win'):
41 if sys.platform.startswith('win'):
42 import ctypes
42 import ctypes
43 ctypes.windll.kernel32.SetErrorMode(0x0002);
43 ctypes.windll.kernel32.SetErrorMode(0x0002);
44
44 args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b'']
45 co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00',
45 if sys.version_info[0] >= 3:
46 (), (), (), '', '', 1, b'')
46 args.insert(1, 0)
47
48 co = types.CodeType(*args)
47 exec(co)
49 exec(co)
48
50
49 def wait(n):
51 def wait(n):
@@ -57,7 +57,7 b' class AsyncResultTest(ClusterTestCase):'
57
57
58 def test_get_after_error(self):
58 def test_get_after_error(self):
59 ar = self.client[-1].apply_async(lambda : 1/0)
59 ar = self.client[-1].apply_async(lambda : 1/0)
60 ar.wait()
60 ar.wait(10)
61 self.assertRaisesRemote(ZeroDivisionError, ar.get)
61 self.assertRaisesRemote(ZeroDivisionError, ar.get)
62 self.assertRaisesRemote(ZeroDivisionError, ar.get)
62 self.assertRaisesRemote(ZeroDivisionError, ar.get)
63 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
63 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
@@ -16,6 +16,8 b' Authors:'
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
20
19 import time
21 import time
20 from datetime import datetime
22 from datetime import datetime
21 from tempfile import mktemp
23 from tempfile import mktemp
@@ -132,7 +134,9 b' class TestClient(ClusterTestCase):'
132 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
134 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
133 allqs = self.client.queue_status()
135 allqs = self.client.queue_status()
134 self.assertTrue(isinstance(allqs, dict))
136 self.assertTrue(isinstance(allqs, dict))
135 self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned']))
137 intkeys = list(allqs.keys())
138 intkeys.remove('unassigned')
139 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
136 unassigned = allqs.pop('unassigned')
140 unassigned = allqs.pop('unassigned')
137 for eid,qs in allqs.items():
141 for eid,qs in allqs.items():
138 self.assertTrue(isinstance(qs, dict))
142 self.assertTrue(isinstance(qs, dict))
@@ -156,7 +160,7 b' class TestClient(ClusterTestCase):'
156 def test_db_query_dt(self):
160 def test_db_query_dt(self):
157 """test db query by date"""
161 """test db query by date"""
158 hist = self.client.hub_history()
162 hist = self.client.hub_history()
159 middle = self.client.db_query({'msg_id' : hist[len(hist)/2]})[0]
163 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
160 tic = middle['submitted']
164 tic = middle['submitted']
161 before = self.client.db_query({'submitted' : {'$lt' : tic}})
165 before = self.client.db_query({'submitted' : {'$lt' : tic}})
162 after = self.client.db_query({'submitted' : {'$gte' : tic}})
166 after = self.client.db_query({'submitted' : {'$gte' : tic}})
@@ -16,6 +16,7 b' Authors:'
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19
20
20 import tempfile
21 import tempfile
21 import time
22 import time
@@ -100,7 +101,7 b' class TestDictBackend(TestCase):'
100 def test_find_records_dt(self):
101 def test_find_records_dt(self):
101 """test finding records by date"""
102 """test finding records by date"""
102 hist = self.db.get_history()
103 hist = self.db.get_history()
103 middle = self.db.get_record(hist[len(hist)/2])
104 middle = self.db.get_record(hist[len(hist)//2])
104 tic = middle['submitted']
105 tic = middle['submitted']
105 before = self.db.find_records({'submitted' : {'$lt' : tic}})
106 before = self.db.find_records({'submitted' : {'$lt' : tic}})
106 after = self.db.find_records({'submitted' : {'$gte' : tic}})
107 after = self.db.find_records({'submitted' : {'$gte' : tic}})
@@ -168,7 +169,7 b' class TestDictBackend(TestCase):'
168 query = {'msg_id' : {'$in':msg_ids}}
169 query = {'msg_id' : {'$in':msg_ids}}
169 self.db.drop_matching_records(query)
170 self.db.drop_matching_records(query)
170 recs = self.db.find_records(query)
171 recs = self.db.find_records(query)
171 self.assertTrue(len(recs)==0)
172 self.assertEquals(len(recs), 0)
172
173
173 class TestSQLiteBackend(TestDictBackend):
174 class TestSQLiteBackend(TestDictBackend):
174 def create_db(self):
175 def create_db(self):
@@ -43,7 +43,7 b' class TestView(ClusterTestCase):'
43 # self.add_engines(1)
43 # self.add_engines(1)
44 eid = self.client.ids[-1]
44 eid = self.client.ids[-1]
45 ar = self.client[eid].apply_async(crash)
45 ar = self.client[eid].apply_async(crash)
46 self.assertRaisesRemote(error.EngineError, ar.get)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 eid = ar.engine_id
47 eid = ar.engine_id
48 tic = time.time()
48 tic = time.time()
49 while eid in self.client.ids and time.time()-tic < 5:
49 while eid in self.client.ids and time.time()-tic < 5:
@@ -413,7 +413,10 b' class TestView(ClusterTestCase):'
413 """test executing unicode strings"""
413 """test executing unicode strings"""
414 v = self.client[-1]
414 v = self.client[-1]
415 v.block=True
415 v.block=True
416 code=u"a=u'é'"
416 if sys.version_info[0] >= 3:
417 code="a='é'"
418 else:
419 code=u"a=u'é'"
417 v.execute(code)
420 v.execute(code)
418 self.assertEquals(v['a'], u'é')
421 self.assertEquals(v['a'], u'é')
419
422
@@ -433,7 +436,7 b' class TestView(ClusterTestCase):'
433 assert isinstance(check, bytes), "%r is not bytes"%check
436 assert isinstance(check, bytes), "%r is not bytes"%check
434 assert a.encode('utf8') == check, "%s != %s"%(a,check)
437 assert a.encode('utf8') == check, "%s != %s"%(a,check)
435
438
436 for s in [ u'é', u'ßø®∫','asdf'.decode() ]:
439 for s in [ u'é', u'ßø®∫',u'asdf' ]:
437 try:
440 try:
438 v.apply_sync(check_unicode, s, s.encode('utf8'))
441 v.apply_sync(check_unicode, s, s.encode('utf8'))
439 except error.RemoteError as e:
442 except error.RemoteError as e:
@@ -101,6 +101,12 b' class ReverseDict(dict):'
101 # Functions
101 # Functions
102 #-----------------------------------------------------------------------------
102 #-----------------------------------------------------------------------------
103
103
104 def ensure_bytes(s):
105 """ensure that an object is bytes"""
106 if isinstance(s, unicode):
107 s = s.encode(sys.getdefaultencoding(), 'replace')
108 return s
109
104 def validate_url(url):
110 def validate_url(url):
105 """validate a url for zeromq"""
111 """validate a url for zeromq"""
106 if not isinstance(url, basestring):
112 if not isinstance(url, basestring):
@@ -23,6 +23,7 b' __docformat__ = "restructuredtext en"'
23 # Imports
23 # Imports
24 #-------------------------------------------------------------------------------
24 #-------------------------------------------------------------------------------
25
25
26 import sys
26 import new, types, copy_reg
27 import new, types, copy_reg
27
28
28 def code_ctor(*args):
29 def code_ctor(*args):
@@ -31,9 +32,12 b' def code_ctor(*args):'
31 def reduce_code(co):
32 def reduce_code(co):
32 if co.co_freevars or co.co_cellvars:
33 if co.co_freevars or co.co_cellvars:
33 raise ValueError("Sorry, cannot pickle code objects with closures")
34 raise ValueError("Sorry, cannot pickle code objects with closures")
34 return code_ctor, (co.co_argcount, co.co_nlocals, co.co_stacksize,
35 args = [co.co_argcount, co.co_nlocals, co.co_stacksize,
35 co.co_flags, co.co_code, co.co_consts, co.co_names,
36 co.co_flags, co.co_code, co.co_consts, co.co_names,
36 co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno,
37 co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno,
37 co.co_lnotab)
38 co.co_lnotab]
39 if sys.version_info[0] >= 3:
40 args.insert(1, co.co_kwonlyargcount)
41 return code_ctor, tuple(args)
38
42
39 copy_reg.pickle(types.CodeType, reduce_code) No newline at end of file
43 copy_reg.pickle(types.CodeType, reduce_code)
@@ -19,16 +19,23 b' __test__ = {}'
19 # Imports
19 # Imports
20 #-------------------------------------------------------------------------------
20 #-------------------------------------------------------------------------------
21
21
22 import sys
22 import cPickle as pickle
23 import cPickle as pickle
23
24
24 try:
25 try:
25 import numpy
26 import numpy
26 except ImportError:
27 except ImportError:
27 pass
28 numpy = None
28
29
29 class SerializationError(Exception):
30 class SerializationError(Exception):
30 pass
31 pass
31
32
33 if sys.version_info[0] >= 3:
34 buffer = memoryview
35 py3k = True
36 else:
37 py3k = False
38
32 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
33 # Classes and functions
40 # Classes and functions
34 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
@@ -93,8 +100,11 b' class SerializeIt(object):'
93 def __init__(self, unSerialized):
100 def __init__(self, unSerialized):
94 self.data = None
101 self.data = None
95 self.obj = unSerialized.getObject()
102 self.obj = unSerialized.getObject()
96 if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray):
103 if numpy is not None and isinstance(self.obj, numpy.ndarray):
97 if len(self.obj.shape) == 0: # length 0 arrays are just pickled
104 if py3k or len(self.obj.shape) == 0: # length 0 arrays are just pickled
105 # FIXME:
106 # also use pickle for numpy arrays on py3k, since
107 # pyzmq doesn't rebuild from memoryviews properly
98 self.typeDescriptor = 'pickle'
108 self.typeDescriptor = 'pickle'
99 self.metadata = {}
109 self.metadata = {}
100 else:
110 else:
@@ -102,7 +112,7 b' class SerializeIt(object):'
102 self.typeDescriptor = 'ndarray'
112 self.typeDescriptor = 'ndarray'
103 self.metadata = {'shape':self.obj.shape,
113 self.metadata = {'shape':self.obj.shape,
104 'dtype':self.obj.dtype.str}
114 'dtype':self.obj.dtype.str}
105 elif isinstance(self.obj, str):
115 elif isinstance(self.obj, bytes):
106 self.typeDescriptor = 'bytes'
116 self.typeDescriptor = 'bytes'
107 self.metadata = {}
117 self.metadata = {}
108 elif isinstance(self.obj, buffer):
118 elif isinstance(self.obj, buffer):
@@ -146,9 +156,9 b' class UnSerializeIt(UnSerialized):'
146
156
147 def getObject(self):
157 def getObject(self):
148 typeDescriptor = self.serialized.getTypeDescriptor()
158 typeDescriptor = self.serialized.getTypeDescriptor()
149 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
159 if numpy is not None and typeDescriptor == 'ndarray':
150 buf = self.serialized.getData()
160 buf = self.serialized.getData()
151 if isinstance(buf, (str, buffer)):
161 if isinstance(buf, (bytes, buffer)):
152 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
162 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
153 else:
163 else:
154 # memoryview
164 # memoryview
General Comments 0
You need to be logged in to leave comments. Login now