##// END OF EJS Templates
update parallel code for py3k...
MinRK -
Show More
@@ -54,7 +54,7 b' from IPython.parallel.controller.hub import HubFactory'
54 54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56 56
57 from IPython.parallel.util import signal_children, split_url
57 from IPython.parallel.util import signal_children, split_url, ensure_bytes
58 58
59 59 # conditional import of MongoDB backend class
60 60
@@ -202,14 +202,13 b' class IPControllerApp(BaseParallelApplication):'
202 202 # load from engine config
203 203 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
204 204 cfg = json.loads(f.read())
205 key = c.Session.key = cfg['exec_key']
205 key = c.Session.key = ensure_bytes(cfg['exec_key'])
206 206 xport,addr = cfg['url'].split('://')
207 207 c.HubFactory.engine_transport = xport
208 208 ip,ports = addr.split(':')
209 209 c.HubFactory.engine_ip = ip
210 210 c.HubFactory.regport = int(ports)
211 211 self.location = cfg['location']
212
213 212 # load client config
214 213 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
215 214 cfg = json.loads(f.read())
@@ -240,9 +239,9 b' class IPControllerApp(BaseParallelApplication):'
240 239 # with open(keyfile, 'w') as f:
241 240 # f.write(key)
242 241 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 c.Session.key = key
242 c.Session.key = ensure_bytes(key)
244 243 else:
245 key = c.Session.key = ''
244 key = c.Session.key = b''
246 245
247 246 try:
248 247 self.factory = HubFactory(config=c, log=self.log)
@@ -273,27 +272,27 b' class IPControllerApp(BaseParallelApplication):'
273 272 hub = self.factory
274 273 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
275 274 # IOPub relay (in a Process)
276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
275 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
277 276 q.bind_in(hub.client_info['iopub'])
278 277 q.bind_out(hub.engine_info['iopub'])
279 q.setsockopt_out(zmq.SUBSCRIBE, '')
278 q.setsockopt_out(zmq.SUBSCRIBE, b'')
280 279 q.connect_mon(hub.monitor_url)
281 280 q.daemon=True
282 281 children.append(q)
283 282
284 283 # Multiplexer Queue (in a Process)
285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
284 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out')
286 285 q.bind_in(hub.client_info['mux'])
287 q.setsockopt_in(zmq.IDENTITY, 'mux')
286 q.setsockopt_in(zmq.IDENTITY, b'mux')
288 287 q.bind_out(hub.engine_info['mux'])
289 288 q.connect_mon(hub.monitor_url)
290 289 q.daemon=True
291 290 children.append(q)
292 291
293 292 # Control Queue (in a Process)
294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
293 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol')
295 294 q.bind_in(hub.client_info['control'])
296 q.setsockopt_in(zmq.IDENTITY, 'control')
295 q.setsockopt_in(zmq.IDENTITY, b'control')
297 296 q.bind_out(hub.engine_info['control'])
298 297 q.connect_mon(hub.monitor_url)
299 298 q.daemon=True
@@ -305,10 +304,10 b' class IPControllerApp(BaseParallelApplication):'
305 304 # Task Queue (in a Process)
306 305 if scheme == 'pure':
307 306 self.log.warn("task::using pure XREQ Task scheduler")
308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
307 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask')
309 308 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 309 q.bind_in(hub.client_info['task'][1])
311 q.setsockopt_in(zmq.IDENTITY, 'task')
310 q.setsockopt_in(zmq.IDENTITY, b'task')
312 311 q.bind_out(hub.engine_info['task'])
313 312 q.connect_mon(hub.monitor_url)
314 313 q.daemon=True
@@ -41,7 +41,7 b' from IPython.config.configurable import Configurable'
41 41 from IPython.zmq.session import Session
42 42 from IPython.parallel.engine.engine import EngineFactory
43 43 from IPython.parallel.engine.streamkernel import Kernel
44 from IPython.parallel.util import disambiguate_url
44 from IPython.parallel.util import disambiguate_url, ensure_bytes
45 45
46 46 from IPython.utils.importstring import import_item
47 47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
@@ -216,11 +216,8 b' class IPEngineApp(BaseParallelApplication):'
216 216 self.log.info("Loading url_file %r"%self.url_file)
217 217 with open(self.url_file) as f:
218 218 d = json.loads(f.read())
219 for k,v in d.iteritems():
220 if isinstance(v, unicode):
221 d[k] = v.encode()
222 219 if d['exec_key']:
223 config.Session.key = d['exec_key']
220 config.Session.key = ensure_bytes(d['exec_key'])
224 221 d['url'] = disambiguate_url(d['url'], d['location'])
225 222 config.EngineFactory.url = d['url']
226 223 config.EngineFactory.location = d['location']
@@ -17,6 +17,7 b' Authors:'
17 17
18 18 import os
19 19 import json
20 import sys
20 21 import time
21 22 import warnings
22 23 from datetime import datetime
@@ -48,6 +49,11 b' from .asyncresult import AsyncResult, AsyncHubResult'
48 49 from IPython.core.profiledir import ProfileDir, ProfileDirError
49 50 from .view import DirectView, LoadBalancedView
50 51
52 if sys.version_info[0] >= 3:
53 # xrange is used in a coupe 'isinstance' tests in py2
54 # should be just 'range' in 3k
55 xrange = range
56
51 57 #--------------------------------------------------------------------------
52 58 # Decorators for Client methods
53 59 #--------------------------------------------------------------------------
@@ -356,13 +362,12 b' class Client(HasTraits):'
356 362 if os.path.isfile(exec_key):
357 363 extra_args['keyfile'] = exec_key
358 364 else:
359 if isinstance(exec_key, unicode):
360 exec_key = exec_key.encode('ascii')
365 exec_key = util.ensure_bytes(exec_key)
361 366 extra_args['key'] = exec_key
362 367 self.session = Session(**extra_args)
363 368
364 369 self._query_socket = self._context.socket(zmq.XREQ)
365 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
370 self._query_socket.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session))
366 371 if self._ssh:
367 372 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
368 373 else:
@@ -404,7 +409,7 b' class Client(HasTraits):'
404 409 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
405 410 for k,v in engines.iteritems():
406 411 eid = int(k)
407 self._engines[eid] = bytes(v) # force not unicode
412 self._engines[eid] = v
408 413 self._ids.append(eid)
409 414 self._ids = sorted(self._ids)
410 415 if sorted(self._engines.keys()) != range(len(self._engines)) and \
@@ -455,7 +460,7 b' class Client(HasTraits):'
455 460 if not isinstance(targets, (tuple, list, xrange)):
456 461 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
457 462
458 return [self._engines[t] for t in targets], list(targets)
463 return [util.ensure_bytes(self._engines[t]) for t in targets], list(targets)
459 464
460 465 def _connect(self, sshserver, ssh_kwargs, timeout):
461 466 """setup all our socket connections to the cluster. This is called from
@@ -488,14 +493,15 b' class Client(HasTraits):'
488 493 content = msg.content
489 494 self._config['registration'] = dict(content)
490 495 if content.status == 'ok':
496 ident = util.ensure_bytes(self.session.session)
491 497 if content.mux:
492 498 self._mux_socket = self._context.socket(zmq.XREQ)
493 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
499 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
494 500 connect_socket(self._mux_socket, content.mux)
495 501 if content.task:
496 502 self._task_scheme, task_addr = content.task
497 503 self._task_socket = self._context.socket(zmq.XREQ)
498 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
504 self._task_socket.setsockopt(zmq.IDENTITY, ident)
499 505 connect_socket(self._task_socket, task_addr)
500 506 if content.notification:
501 507 self._notification_socket = self._context.socket(zmq.SUB)
@@ -507,12 +513,12 b' class Client(HasTraits):'
507 513 # connect_socket(self._query_socket, content.query)
508 514 if content.control:
509 515 self._control_socket = self._context.socket(zmq.XREQ)
510 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
516 self._control_socket.setsockopt(zmq.IDENTITY, ident)
511 517 connect_socket(self._control_socket, content.control)
512 518 if content.iopub:
513 519 self._iopub_socket = self._context.socket(zmq.SUB)
514 520 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
515 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
521 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
516 522 connect_socket(self._iopub_socket, content.iopub)
517 523 self._update_engines(dict(content.engines))
518 524 else:
@@ -13,8 +13,6 b' Authors:'
13 13
14 14 """
15 15
16 __docformat__ = "restructuredtext en"
17
18 16 #-------------------------------------------------------------------------------
19 17 # Copyright (C) 2008-2011 The IPython Development Team
20 18 #
@@ -26,6 +24,8 b' __docformat__ = "restructuredtext en"'
26 24 # Imports
27 25 #-------------------------------------------------------------------------------
28 26
27 from __future__ import division
28
29 29 import types
30 30
31 31 from IPython.utils.data import flatten as utils_flatten
@@ -67,7 +67,7 b' class Map:'
67 67 return
68 68
69 69 remainder = len(seq)%q
70 basesize = len(seq)/q
70 basesize = len(seq)//q
71 71 hi = []
72 72 lo = []
73 73 for n in range(q):
@@ -16,6 +16,8 b' Authors:'
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 from __future__ import division
20
19 21 import warnings
20 22
21 23 from IPython.testing.skipdoctest import skip_doctest
@@ -142,7 +144,7 b' class ParallelFunction(RemoteFunction):'
142 144 balanced = 'Balanced' in self.view.__class__.__name__
143 145 if balanced:
144 146 if self.chunksize:
145 nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0)
147 nparts = len_0//self.chunksize + int(len_0%self.chunksize > 0)
146 148 else:
147 149 nparts = len_0
148 150 targets = [None]*nparts
@@ -169,7 +171,10 b' class ParallelFunction(RemoteFunction):'
169 171
170 172 # print (args)
171 173 if hasattr(self, '_map'):
172 f = map
174 if sys.version_info[0] >= 3:
175 f = lambda f, *sequences: list(map(f, *sequences))
176 else:
177 f = map
173 178 args = [self.func]+args
174 179 else:
175 180 f=self.func
@@ -969,6 +969,8 b' class LoadBalancedView(View):'
969 969 idents = []
970 970 else:
971 971 idents = self.client._build_targets(targets)[0]
972 # ensure *not* bytes
973 idents = [ ident.decode() for ident in idents ]
972 974
973 975 after = self._render_dependency(after)
974 976 follow = self._render_dependency(follow)
@@ -25,6 +25,8 b' from zmq.eventloop import ioloop, zmqstream'
25 25 from IPython.config.configurable import LoggingConfigurable
26 26 from IPython.utils.traitlets import Set, Instance, CFloat
27 27
28 from IPython.parallel.util import ensure_bytes
29
28 30 class Heart(object):
29 31 """A basic heart object for responding to a HeartMonitor.
30 32 This is a simple wrapper with defaults for the most common
@@ -42,9 +44,9 b' class Heart(object):'
42 44 self.device.connect_in(in_addr)
43 45 self.device.connect_out(out_addr)
44 46 if in_type == zmq.SUB:
45 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
46 48 if heart_id is None:
47 heart_id = str(uuid.uuid4())
49 heart_id = ensure_bytes(uuid.uuid4())
48 50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
49 51 self.id = heart_id
50 52
@@ -115,7 +117,7 b' class HeartMonitor(LoggingConfigurable):'
115 117 self.responses = set()
116 118 # print self.on_probation, self.hearts
117 119 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
118 self.pingstream.send(str(self.lifetime))
120 self.pingstream.send(ensure_bytes(str(self.lifetime)))
119 121
120 122 def handle_new_heart(self, heart):
121 123 if self._new_handlers:
@@ -140,11 +142,13 b' class HeartMonitor(LoggingConfigurable):'
140 142
141 143 def handle_pong(self, msg):
142 144 "a heart just beat"
143 if msg[1] == str(self.lifetime):
145 current = ensure_bytes(str(self.lifetime))
146 last = ensure_bytes(str(self.last_ping))
147 if msg[1] == current:
144 148 delta = time.time()-self.tic
145 149 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
146 150 self.responses.add(msg[0])
147 elif msg[1] == str(self.last_ping):
151 elif msg[1] == last:
148 152 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
149 153 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
150 154 self.responses.add(msg[0])
@@ -289,7 +289,7 b' class HubFactory(RegistrationFactory):'
289 289 # resubmit stream
290 290 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
291 291 url = util.disambiguate_url(self.client_info['task'][-1])
292 r.setsockopt(zmq.IDENTITY, self.session.session)
292 r.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session))
293 293 r.connect(url)
294 294
295 295 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -380,14 +380,14 b' class Hub(SessionFactory):'
380 380 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
381 381 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
382 382
383 self.monitor_handlers = { 'in' : self.save_queue_request,
384 'out': self.save_queue_result,
385 'intask': self.save_task_request,
386 'outtask': self.save_task_result,
387 'tracktask': self.save_task_destination,
388 'incontrol': _passer,
389 'outcontrol': _passer,
390 'iopub': self.save_iopub_message,
383 self.monitor_handlers = {b'in' : self.save_queue_request,
384 b'out': self.save_queue_result,
385 b'intask': self.save_task_request,
386 b'outtask': self.save_task_result,
387 b'tracktask': self.save_task_destination,
388 b'incontrol': _passer,
389 b'outcontrol': _passer,
390 b'iopub': self.save_iopub_message,
391 391 }
392 392
393 393 self.query_handlers = {'queue_request': self.queue_status,
@@ -562,8 +562,9 b' class Hub(SessionFactory):'
562 562 return
563 563 record = init_record(msg)
564 564 msg_id = record['msg_id']
565 record['engine_uuid'] = queue_id
566 record['client_uuid'] = client_id
565 # Unicode in records
566 record['engine_uuid'] = queue_id.decode('utf8', 'replace')
567 record['client_uuid'] = client_id.decode('utf8', 'replace')
567 568 record['queue'] = 'mux'
568 569
569 570 try:
@@ -751,7 +752,7 b' class Hub(SessionFactory):'
751 752 # print (content)
752 753 msg_id = content['msg_id']
753 754 engine_uuid = content['engine_id']
754 eid = self.by_ident[engine_uuid]
755 eid = self.by_ident[util.ensure_bytes(engine_uuid)]
755 756
756 757 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
757 758 if msg_id in self.unassigned:
@@ -833,7 +834,7 b' class Hub(SessionFactory):'
833 834 jsonable = {}
834 835 for k,v in self.keytable.iteritems():
835 836 if v not in self.dead_engines:
836 jsonable[str(k)] = v
837 jsonable[str(k)] = v.decode()
837 838 content['engines'] = jsonable
838 839 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
839 840
@@ -841,11 +842,13 b' class Hub(SessionFactory):'
841 842 """Register a new engine."""
842 843 content = msg['content']
843 844 try:
844 queue = content['queue']
845 queue = util.ensure_bytes(content['queue'])
845 846 except KeyError:
846 847 self.log.error("registration::queue not specified", exc_info=True)
847 848 return
848 849 heart = content.get('heartbeat', None)
850 if heart:
851 heart = util.ensure_bytes(heart)
849 852 """register a new engine, and create the socket(s) necessary"""
850 853 eid = self._next_id
851 854 # print (eid, queue, reg, heart)
@@ -912,7 +915,7 b' class Hub(SessionFactory):'
912 915 self.log.info("registration::unregister_engine(%r)"%eid)
913 916 # print (eid)
914 917 uuid = self.keytable[eid]
915 content=dict(id=eid, queue=uuid)
918 content=dict(id=eid, queue=uuid.decode())
916 919 self.dead_engines.add(uuid)
917 920 # self.ids.remove(eid)
918 921 # uuid = self.keytable.pop(eid)
@@ -980,7 +983,7 b' class Hub(SessionFactory):'
980 983 self.tasks[eid] = list()
981 984 self.completed[eid] = list()
982 985 self.hearts[heart] = eid
983 content = dict(id=eid, queue=self.engines[eid].queue)
986 content = dict(id=eid, queue=self.engines[eid].queue.decode())
984 987 if self.notifier:
985 988 self.session.send(self.notifier, "registration_notification", content=content)
986 989 self.log.info("engine::Engine Connected: %i"%eid)
@@ -1054,9 +1057,9 b' class Hub(SessionFactory):'
1054 1057 queue = len(queue)
1055 1058 completed = len(completed)
1056 1059 tasks = len(tasks)
1057 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1060 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1058 1061 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1059
1062 # print (content)
1060 1063 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1061 1064
1062 1065 def purge_results(self, client_id, msg):
@@ -1179,7 +1182,7 b' class Hub(SessionFactory):'
1179 1182 'io' : io_dict,
1180 1183 }
1181 1184 if rec['result_buffers']:
1182 buffers = map(str, rec['result_buffers'])
1185 buffers = map(bytes, rec['result_buffers'])
1183 1186 else:
1184 1187 buffers = []
1185 1188
@@ -1281,7 +1284,7 b' class Hub(SessionFactory):'
1281 1284 buffers.extend(rb)
1282 1285 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1283 1286 result_buffer_lens=result_buffer_lens)
1284
1287 # self.log.debug (content)
1285 1288 self.session.send(self.query, "db_reply", content=content,
1286 1289 parent=msg, ident=client_id,
1287 1290 buffers=buffers)
@@ -40,11 +40,11 b' from zmq.eventloop import ioloop, zmqstream'
40 40 from IPython.external.decorator import decorator
41 41 from IPython.config.application import Application
42 42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes
44 44
45 45 from IPython.parallel import error
46 46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger
47 from IPython.parallel.util import connect_logger, local_logger, ensure_bytes
48 48
49 49 from .dependency import Dependency
50 50
@@ -174,6 +174,10 b' class TaskScheduler(SessionFactory):'
174 174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176 176
177 ident = CBytes() # ZMQ identity. This should just be self.session.session
178 # but ensure Bytes
179 def _ident_default(self):
180 return ensure_bytes(self.session.session)
177 181
178 182 def start(self):
179 183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
@@ -204,7 +208,7 b' class TaskScheduler(SessionFactory):'
204 208 try:
205 209 idents,msg = self.session.feed_identities(msg)
206 210 except ValueError:
207 self.log.warn("task::Invalid Message: %r"%msg)
211 self.log.warn("task::Invalid Message: %r",msg)
208 212 return
209 213 try:
210 214 msg = self.session.unpack_message(msg)
@@ -219,15 +223,16 b' class TaskScheduler(SessionFactory):'
219 223 self.log.error("Unhandled message type: %r"%msg_type)
220 224 else:
221 225 try:
222 handler(str(msg['content']['queue']))
223 except KeyError:
224 self.log.error("task::Invalid notification msg: %r"%msg)
226 handler(ensure_bytes(msg['content']['queue']))
227 except Exception:
228 self.log.error("task::Invalid notification msg: %r",msg)
225 229
226 230 def _register_engine(self, uid):
227 231 """New engine with ident `uid` became available."""
228 232 # head of the line:
229 233 self.targets.insert(0,uid)
230 234 self.loads.insert(0,0)
235
231 236 # initialize sets
232 237 self.completed[uid] = set()
233 238 self.failed[uid] = set()
@@ -309,14 +314,18 b' class TaskScheduler(SessionFactory):'
309 314
310 315
311 316 # send to monitor
312 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
317 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
313 318
314 319 header = msg['header']
315 320 msg_id = header['msg_id']
316 321 self.all_ids.add(msg_id)
317 322
318 # targets
319 targets = set(header.get('targets', []))
323 # get targets as a set of bytes objects
324 # from a list of unicode objects
325 targets = header.get('targets', [])
326 targets = map(ensure_bytes, targets)
327 targets = set(targets)
328
320 329 retries = header.get('retries', 0)
321 330 self.retries[msg_id] = retries
322 331
@@ -412,7 +421,7 b' class TaskScheduler(SessionFactory):'
412 421
413 422 msg = self.session.send(self.client_stream, 'apply_reply', content,
414 423 parent=header, ident=idents)
415 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
424 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
416 425
417 426 self.update_graph(msg_id, success=False)
418 427
@@ -494,9 +503,9 b' class TaskScheduler(SessionFactory):'
494 503 self.add_job(idx)
495 504 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
496 505 # notify Hub
497 content = dict(msg_id=msg_id, engine_id=target)
506 content = dict(msg_id=msg_id, engine_id=target.decode())
498 507 self.session.send(self.mon_stream, 'task_destination', content=content,
499 ident=['tracktask',self.session.session])
508 ident=[b'tracktask',self.ident])
500 509
501 510
502 511 #-----------------------------------------------------------------------
@@ -533,7 +542,7 b' class TaskScheduler(SessionFactory):'
533 542 # relay to client and update graph
534 543 self.handle_result(idents, parent, raw_msg, success)
535 544 # send to Hub monitor
536 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
545 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
537 546 else:
538 547 self.handle_unmet_dependency(idents, parent)
539 548
@@ -28,6 +28,12 b' from IPython.utils.jsonutil import date_default, extract_dates, squash_dates'
28 28 # SQLite operators, adapters, and converters
29 29 #-----------------------------------------------------------------------------
30 30
31 try:
32 buffer
33 except NameError:
34 # py3k
35 buffer = memoryview
36
31 37 operators = {
32 38 '$lt' : "<",
33 39 '$gt' : ">",
@@ -54,7 +60,11 b' def _convert_dict(ds):'
54 60 if ds is None:
55 61 return ds
56 62 else:
57 return extract_dates(json.loads(ds))
63 if isinstance(ds, bytes):
64 # If I understand the sqlite doc correctly, this will always be utf8
65 ds = ds.decode('utf8')
66 d = json.loads(ds)
67 return extract_dates(d)
58 68
59 69 def _adapt_bufs(bufs):
60 70 # this is *horrible*
@@ -28,7 +28,7 b' from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode'
28 28
29 29 from IPython.parallel.controller.heartmonitor import Heart
30 30 from IPython.parallel.factory import RegistrationFactory
31 from IPython.parallel.util import disambiguate_url
31 from IPython.parallel.util import disambiguate_url, ensure_bytes
32 32
33 33 from IPython.zmq.session import Message
34 34
@@ -65,7 +65,7 b' class EngineFactory(RegistrationFactory):'
65 65 ctx = self.context
66 66
67 67 reg = ctx.socket(zmq.XREQ)
68 reg.setsockopt(zmq.IDENTITY, self.ident)
68 reg.setsockopt(zmq.IDENTITY, ensure_bytes(self.ident))
69 69 reg.connect(self.url)
70 70 self.registrar = zmqstream.ZMQStream(reg, self.loop)
71 71
@@ -83,7 +83,7 b' class EngineFactory(RegistrationFactory):'
83 83 self._abort_dc.stop()
84 84 ctx = self.context
85 85 loop = self.loop
86 identity = self.ident
86 identity = ensure_bytes(self.ident)
87 87
88 88 idents,msg = self.session.feed_identities(msg)
89 89 msg = Message(self.session.unpack_message(msg))
@@ -35,12 +35,12 b' import zmq'
35 35 from zmq.eventloop import ioloop, zmqstream
36 36
37 37 # Local imports.
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes
39 39 from IPython.zmq.completer import KernelCompleter
40 40
41 41 from IPython.parallel.error import wrap_exception
42 42 from IPython.parallel.factory import SessionFactory
43 from IPython.parallel.util import serialize_object, unpack_apply_message
43 from IPython.parallel.util import serialize_object, unpack_apply_message, ensure_bytes
44 44
45 45 def printer(*args):
46 46 pprint(args, stream=sys.__stdout__)
@@ -73,8 +73,14 b' class Kernel(SessionFactory):'
73 73 # kwargs:
74 74 exec_lines = List(Unicode, config=True,
75 75 help="List of lines to execute")
76
76
77 # identities:
77 78 int_id = Int(-1)
79 bident = CBytes()
80 ident = Unicode()
81 def _ident_changed(self, name, old, new):
82 self.bident = ensure_bytes(new)
83
78 84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
79 85
80 86 control_stream = Instance(zmqstream.ZMQStream)
@@ -193,6 +199,8 b' class Kernel(SessionFactory):'
193 199 except:
194 200 self.log.error("Invalid Message", exc_info=True)
195 201 return
202 else:
203 self.log.debug("Control received, %s", msg)
196 204
197 205 header = msg['header']
198 206 msg_id = header['msg_id']
@@ -247,7 +255,7 b' class Kernel(SessionFactory):'
247 255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
248 256 return
249 257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
250 ident='%s.pyin'%self.prefix)
258 ident=ensure_bytes('%s.pyin'%self.prefix))
251 259 started = datetime.now()
252 260 try:
253 261 comp_code = self.compiler(code, '<zmq-kernel>')
@@ -261,7 +269,7 b' class Kernel(SessionFactory):'
261 269 exc_content = self._wrap_exception('execute')
262 270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
263 271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
264 ident='%s.pyerr'%self.prefix)
272 ident=ensure_bytes('%s.pyerr'%self.prefix))
265 273 reply_content = exc_content
266 274 else:
267 275 reply_content = {'status' : 'ok'}
@@ -285,7 +293,6 b' class Kernel(SessionFactory):'
285 293 def apply_request(self, stream, ident, parent):
286 294 # flush previous reply, so this request won't block it
287 295 stream.flush(zmq.POLLOUT)
288
289 296 try:
290 297 content = parent[u'content']
291 298 bufs = parent[u'buffers']
@@ -341,7 +348,7 b' class Kernel(SessionFactory):'
341 348 exc_content = self._wrap_exception('apply')
342 349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
343 350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
344 ident='%s.pyerr'%self.prefix)
351 ident=ensure_bytes('%s.pyerr'%self.prefix))
345 352 reply_content = exc_content
346 353 result_buf = []
347 354
@@ -370,6 +377,8 b' class Kernel(SessionFactory):'
370 377 except:
371 378 self.log.error("Invalid Message", exc_info=True)
372 379 return
380 else:
381 self.log.debug("Message received, %s", msg)
373 382
374 383
375 384 header = msg['header']
@@ -41,9 +41,11 b' def crash():'
41 41 if sys.platform.startswith('win'):
42 42 import ctypes
43 43 ctypes.windll.kernel32.SetErrorMode(0x0002);
44
45 co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00',
46 (), (), (), '', '', 1, b'')
44 args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b'']
45 if sys.version_info[0] >= 3:
46 args.insert(1, 0)
47
48 co = types.CodeType(*args)
47 49 exec(co)
48 50
49 51 def wait(n):
@@ -57,7 +57,7 b' class AsyncResultTest(ClusterTestCase):'
57 57
58 58 def test_get_after_error(self):
59 59 ar = self.client[-1].apply_async(lambda : 1/0)
60 ar.wait()
60 ar.wait(10)
61 61 self.assertRaisesRemote(ZeroDivisionError, ar.get)
62 62 self.assertRaisesRemote(ZeroDivisionError, ar.get)
63 63 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
@@ -16,6 +16,8 b' Authors:'
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 from __future__ import division
20
19 21 import time
20 22 from datetime import datetime
21 23 from tempfile import mktemp
@@ -132,7 +134,9 b' class TestClient(ClusterTestCase):'
132 134 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
133 135 allqs = self.client.queue_status()
134 136 self.assertTrue(isinstance(allqs, dict))
135 self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned']))
137 intkeys = list(allqs.keys())
138 intkeys.remove('unassigned')
139 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
136 140 unassigned = allqs.pop('unassigned')
137 141 for eid,qs in allqs.items():
138 142 self.assertTrue(isinstance(qs, dict))
@@ -156,7 +160,7 b' class TestClient(ClusterTestCase):'
156 160 def test_db_query_dt(self):
157 161 """test db query by date"""
158 162 hist = self.client.hub_history()
159 middle = self.client.db_query({'msg_id' : hist[len(hist)/2]})[0]
163 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
160 164 tic = middle['submitted']
161 165 before = self.client.db_query({'submitted' : {'$lt' : tic}})
162 166 after = self.client.db_query({'submitted' : {'$gte' : tic}})
@@ -16,6 +16,7 b' Authors:'
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 from __future__ import division
19 20
20 21 import tempfile
21 22 import time
@@ -100,7 +101,7 b' class TestDictBackend(TestCase):'
100 101 def test_find_records_dt(self):
101 102 """test finding records by date"""
102 103 hist = self.db.get_history()
103 middle = self.db.get_record(hist[len(hist)/2])
104 middle = self.db.get_record(hist[len(hist)//2])
104 105 tic = middle['submitted']
105 106 before = self.db.find_records({'submitted' : {'$lt' : tic}})
106 107 after = self.db.find_records({'submitted' : {'$gte' : tic}})
@@ -168,7 +169,7 b' class TestDictBackend(TestCase):'
168 169 query = {'msg_id' : {'$in':msg_ids}}
169 170 self.db.drop_matching_records(query)
170 171 recs = self.db.find_records(query)
171 self.assertTrue(len(recs)==0)
172 self.assertEquals(len(recs), 0)
172 173
173 174 class TestSQLiteBackend(TestDictBackend):
174 175 def create_db(self):
@@ -43,7 +43,7 b' class TestView(ClusterTestCase):'
43 43 # self.add_engines(1)
44 44 eid = self.client.ids[-1]
45 45 ar = self.client[eid].apply_async(crash)
46 self.assertRaisesRemote(error.EngineError, ar.get)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 47 eid = ar.engine_id
48 48 tic = time.time()
49 49 while eid in self.client.ids and time.time()-tic < 5:
@@ -413,7 +413,10 b' class TestView(ClusterTestCase):'
413 413 """test executing unicode strings"""
414 414 v = self.client[-1]
415 415 v.block=True
416 code=u"a=u'é'"
416 if sys.version_info[0] >= 3:
417 code="a='é'"
418 else:
419 code=u"a=u'é'"
417 420 v.execute(code)
418 421 self.assertEquals(v['a'], u'é')
419 422
@@ -433,7 +436,7 b' class TestView(ClusterTestCase):'
433 436 assert isinstance(check, bytes), "%r is not bytes"%check
434 437 assert a.encode('utf8') == check, "%s != %s"%(a,check)
435 438
436 for s in [ u'é', u'ßø®∫','asdf'.decode() ]:
439 for s in [ u'é', u'ßø®∫',u'asdf' ]:
437 440 try:
438 441 v.apply_sync(check_unicode, s, s.encode('utf8'))
439 442 except error.RemoteError as e:
@@ -101,6 +101,12 b' class ReverseDict(dict):'
101 101 # Functions
102 102 #-----------------------------------------------------------------------------
103 103
104 def ensure_bytes(s):
105 """ensure that an object is bytes"""
106 if isinstance(s, unicode):
107 s = s.encode(sys.getdefaultencoding(), 'replace')
108 return s
109
104 110 def validate_url(url):
105 111 """validate a url for zeromq"""
106 112 if not isinstance(url, basestring):
@@ -23,6 +23,7 b' __docformat__ = "restructuredtext en"'
23 23 # Imports
24 24 #-------------------------------------------------------------------------------
25 25
26 import sys
26 27 import new, types, copy_reg
27 28
28 29 def code_ctor(*args):
@@ -31,9 +32,12 b' def code_ctor(*args):'
31 32 def reduce_code(co):
32 33 if co.co_freevars or co.co_cellvars:
33 34 raise ValueError("Sorry, cannot pickle code objects with closures")
34 return code_ctor, (co.co_argcount, co.co_nlocals, co.co_stacksize,
35 co.co_flags, co.co_code, co.co_consts, co.co_names,
36 co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno,
37 co.co_lnotab)
35 args = [co.co_argcount, co.co_nlocals, co.co_stacksize,
36 co.co_flags, co.co_code, co.co_consts, co.co_names,
37 co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno,
38 co.co_lnotab]
39 if sys.version_info[0] >= 3:
40 args.insert(1, co.co_kwonlyargcount)
41 return code_ctor, tuple(args)
38 42
39 43 copy_reg.pickle(types.CodeType, reduce_code) No newline at end of file
@@ -19,16 +19,23 b' __test__ = {}'
19 19 # Imports
20 20 #-------------------------------------------------------------------------------
21 21
22 import sys
22 23 import cPickle as pickle
23 24
24 25 try:
25 26 import numpy
26 27 except ImportError:
27 pass
28 numpy = None
28 29
29 30 class SerializationError(Exception):
30 31 pass
31 32
33 if sys.version_info[0] >= 3:
34 buffer = memoryview
35 py3k = True
36 else:
37 py3k = False
38
32 39 #-----------------------------------------------------------------------------
33 40 # Classes and functions
34 41 #-----------------------------------------------------------------------------
@@ -93,8 +100,11 b' class SerializeIt(object):'
93 100 def __init__(self, unSerialized):
94 101 self.data = None
95 102 self.obj = unSerialized.getObject()
96 if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray):
97 if len(self.obj.shape) == 0: # length 0 arrays are just pickled
103 if numpy is not None and isinstance(self.obj, numpy.ndarray):
104 if py3k or len(self.obj.shape) == 0: # length 0 arrays are just pickled
105 # FIXME:
106 # also use pickle for numpy arrays on py3k, since
107 # pyzmq doesn't rebuild from memoryviews properly
98 108 self.typeDescriptor = 'pickle'
99 109 self.metadata = {}
100 110 else:
@@ -102,7 +112,7 b' class SerializeIt(object):'
102 112 self.typeDescriptor = 'ndarray'
103 113 self.metadata = {'shape':self.obj.shape,
104 114 'dtype':self.obj.dtype.str}
105 elif isinstance(self.obj, str):
115 elif isinstance(self.obj, bytes):
106 116 self.typeDescriptor = 'bytes'
107 117 self.metadata = {}
108 118 elif isinstance(self.obj, buffer):
@@ -146,9 +156,9 b' class UnSerializeIt(UnSerialized):'
146 156
147 157 def getObject(self):
148 158 typeDescriptor = self.serialized.getTypeDescriptor()
149 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
159 if numpy is not None and typeDescriptor == 'ndarray':
150 160 buf = self.serialized.getData()
151 if isinstance(buf, (str, buffer)):
161 if isinstance(buf, (bytes, buffer)):
152 162 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
153 163 else:
154 164 # memoryview
General Comments 0
You need to be logged in to leave comments. Login now