Show More
@@ -54,7 +54,7 from IPython.parallel.controller.hub import HubFactory | |||
|
54 | 54 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler |
|
55 | 55 | from IPython.parallel.controller.sqlitedb import SQLiteDB |
|
56 | 56 | |
|
57 | from IPython.parallel.util import signal_children, split_url | |
|
57 | from IPython.parallel.util import signal_children, split_url, ensure_bytes | |
|
58 | 58 | |
|
59 | 59 | # conditional import of MongoDB backend class |
|
60 | 60 | |
@@ -202,14 +202,13 class IPControllerApp(BaseParallelApplication): | |||
|
202 | 202 | # load from engine config |
|
203 | 203 | with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f: |
|
204 | 204 | cfg = json.loads(f.read()) |
|
205 | key = c.Session.key = cfg['exec_key'] | |
|
205 | key = c.Session.key = ensure_bytes(cfg['exec_key']) | |
|
206 | 206 | xport,addr = cfg['url'].split('://') |
|
207 | 207 | c.HubFactory.engine_transport = xport |
|
208 | 208 | ip,ports = addr.split(':') |
|
209 | 209 | c.HubFactory.engine_ip = ip |
|
210 | 210 | c.HubFactory.regport = int(ports) |
|
211 | 211 | self.location = cfg['location'] |
|
212 | ||
|
213 | 212 | # load client config |
|
214 | 213 | with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f: |
|
215 | 214 | cfg = json.loads(f.read()) |
@@ -240,9 +239,9 class IPControllerApp(BaseParallelApplication): | |||
|
240 | 239 | # with open(keyfile, 'w') as f: |
|
241 | 240 | # f.write(key) |
|
242 | 241 | # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) |
|
243 | c.Session.key = key | |
|
242 | c.Session.key = ensure_bytes(key) | |
|
244 | 243 | else: |
|
245 | key = c.Session.key = '' | |
|
244 | key = c.Session.key = b'' | |
|
246 | 245 | |
|
247 | 246 | try: |
|
248 | 247 | self.factory = HubFactory(config=c, log=self.log) |
@@ -273,27 +272,27 class IPControllerApp(BaseParallelApplication): | |||
|
273 | 272 | hub = self.factory |
|
274 | 273 | # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url |
|
275 | 274 | # IOPub relay (in a Process) |
|
276 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') | |
|
275 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') | |
|
277 | 276 | q.bind_in(hub.client_info['iopub']) |
|
278 | 277 | q.bind_out(hub.engine_info['iopub']) |
|
279 | q.setsockopt_out(zmq.SUBSCRIBE, '') | |
|
278 | q.setsockopt_out(zmq.SUBSCRIBE, b'') | |
|
280 | 279 | q.connect_mon(hub.monitor_url) |
|
281 | 280 | q.daemon=True |
|
282 | 281 | children.append(q) |
|
283 | 282 | |
|
284 | 283 | # Multiplexer Queue (in a Process) |
|
285 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | |
|
284 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out') | |
|
286 | 285 | q.bind_in(hub.client_info['mux']) |
|
287 | q.setsockopt_in(zmq.IDENTITY, 'mux') | |
|
286 | q.setsockopt_in(zmq.IDENTITY, b'mux') | |
|
288 | 287 | q.bind_out(hub.engine_info['mux']) |
|
289 | 288 | q.connect_mon(hub.monitor_url) |
|
290 | 289 | q.daemon=True |
|
291 | 290 | children.append(q) |
|
292 | 291 | |
|
293 | 292 | # Control Queue (in a Process) |
|
294 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | |
|
293 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol') | |
|
295 | 294 | q.bind_in(hub.client_info['control']) |
|
296 | q.setsockopt_in(zmq.IDENTITY, 'control') | |
|
295 | q.setsockopt_in(zmq.IDENTITY, b'control') | |
|
297 | 296 | q.bind_out(hub.engine_info['control']) |
|
298 | 297 | q.connect_mon(hub.monitor_url) |
|
299 | 298 | q.daemon=True |
@@ -305,10 +304,10 class IPControllerApp(BaseParallelApplication): | |||
|
305 | 304 | # Task Queue (in a Process) |
|
306 | 305 | if scheme == 'pure': |
|
307 | 306 | self.log.warn("task::using pure XREQ Task scheduler") |
|
308 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | |
|
307 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask') | |
|
309 | 308 | # q.setsockopt_out(zmq.HWM, hub.hwm) |
|
310 | 309 | q.bind_in(hub.client_info['task'][1]) |
|
311 | q.setsockopt_in(zmq.IDENTITY, 'task') | |
|
310 | q.setsockopt_in(zmq.IDENTITY, b'task') | |
|
312 | 311 | q.bind_out(hub.engine_info['task']) |
|
313 | 312 | q.connect_mon(hub.monitor_url) |
|
314 | 313 | q.daemon=True |
@@ -41,7 +41,7 from IPython.config.configurable import Configurable | |||
|
41 | 41 | from IPython.zmq.session import Session |
|
42 | 42 | from IPython.parallel.engine.engine import EngineFactory |
|
43 | 43 | from IPython.parallel.engine.streamkernel import Kernel |
|
44 | from IPython.parallel.util import disambiguate_url | |
|
44 | from IPython.parallel.util import disambiguate_url, ensure_bytes | |
|
45 | 45 | |
|
46 | 46 | from IPython.utils.importstring import import_item |
|
47 | 47 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float |
@@ -216,11 +216,8 class IPEngineApp(BaseParallelApplication): | |||
|
216 | 216 | self.log.info("Loading url_file %r"%self.url_file) |
|
217 | 217 | with open(self.url_file) as f: |
|
218 | 218 | d = json.loads(f.read()) |
|
219 | for k,v in d.iteritems(): | |
|
220 | if isinstance(v, unicode): | |
|
221 | d[k] = v.encode() | |
|
222 | 219 | if d['exec_key']: |
|
223 | config.Session.key = d['exec_key'] | |
|
220 | config.Session.key = ensure_bytes(d['exec_key']) | |
|
224 | 221 | d['url'] = disambiguate_url(d['url'], d['location']) |
|
225 | 222 | config.EngineFactory.url = d['url'] |
|
226 | 223 | config.EngineFactory.location = d['location'] |
@@ -17,6 +17,7 Authors: | |||
|
17 | 17 | |
|
18 | 18 | import os |
|
19 | 19 | import json |
|
20 | import sys | |
|
20 | 21 | import time |
|
21 | 22 | import warnings |
|
22 | 23 | from datetime import datetime |
@@ -48,6 +49,11 from .asyncresult import AsyncResult, AsyncHubResult | |||
|
48 | 49 | from IPython.core.profiledir import ProfileDir, ProfileDirError |
|
49 | 50 | from .view import DirectView, LoadBalancedView |
|
50 | 51 | |
|
52 | if sys.version_info[0] >= 3: | |
|
53 | # xrange is used in a coupe 'isinstance' tests in py2 | |
|
54 | # should be just 'range' in 3k | |
|
55 | xrange = range | |
|
56 | ||
|
51 | 57 | #-------------------------------------------------------------------------- |
|
52 | 58 | # Decorators for Client methods |
|
53 | 59 | #-------------------------------------------------------------------------- |
@@ -356,13 +362,12 class Client(HasTraits): | |||
|
356 | 362 | if os.path.isfile(exec_key): |
|
357 | 363 | extra_args['keyfile'] = exec_key |
|
358 | 364 | else: |
|
359 |
|
|
|
360 | exec_key = exec_key.encode('ascii') | |
|
365 | exec_key = util.ensure_bytes(exec_key) | |
|
361 | 366 | extra_args['key'] = exec_key |
|
362 | 367 | self.session = Session(**extra_args) |
|
363 | 368 | |
|
364 | 369 | self._query_socket = self._context.socket(zmq.XREQ) |
|
365 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
370 | self._query_socket.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session)) | |
|
366 | 371 | if self._ssh: |
|
367 | 372 | tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) |
|
368 | 373 | else: |
@@ -404,7 +409,7 class Client(HasTraits): | |||
|
404 | 409 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" |
|
405 | 410 | for k,v in engines.iteritems(): |
|
406 | 411 | eid = int(k) |
|
407 |
self._engines[eid] = |
|
|
412 | self._engines[eid] = v | |
|
408 | 413 | self._ids.append(eid) |
|
409 | 414 | self._ids = sorted(self._ids) |
|
410 | 415 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ |
@@ -455,7 +460,7 class Client(HasTraits): | |||
|
455 | 460 | if not isinstance(targets, (tuple, list, xrange)): |
|
456 | 461 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) |
|
457 | 462 | |
|
458 | return [self._engines[t] for t in targets], list(targets) | |
|
463 | return [util.ensure_bytes(self._engines[t]) for t in targets], list(targets) | |
|
459 | 464 | |
|
460 | 465 | def _connect(self, sshserver, ssh_kwargs, timeout): |
|
461 | 466 | """setup all our socket connections to the cluster. This is called from |
@@ -488,14 +493,15 class Client(HasTraits): | |||
|
488 | 493 | content = msg.content |
|
489 | 494 | self._config['registration'] = dict(content) |
|
490 | 495 | if content.status == 'ok': |
|
496 | ident = util.ensure_bytes(self.session.session) | |
|
491 | 497 | if content.mux: |
|
492 | 498 | self._mux_socket = self._context.socket(zmq.XREQ) |
|
493 |
self._mux_socket.setsockopt(zmq.IDENTITY, |
|
|
499 | self._mux_socket.setsockopt(zmq.IDENTITY, ident) | |
|
494 | 500 | connect_socket(self._mux_socket, content.mux) |
|
495 | 501 | if content.task: |
|
496 | 502 | self._task_scheme, task_addr = content.task |
|
497 | 503 | self._task_socket = self._context.socket(zmq.XREQ) |
|
498 |
self._task_socket.setsockopt(zmq.IDENTITY, |
|
|
504 | self._task_socket.setsockopt(zmq.IDENTITY, ident) | |
|
499 | 505 | connect_socket(self._task_socket, task_addr) |
|
500 | 506 | if content.notification: |
|
501 | 507 | self._notification_socket = self._context.socket(zmq.SUB) |
@@ -507,12 +513,12 class Client(HasTraits): | |||
|
507 | 513 | # connect_socket(self._query_socket, content.query) |
|
508 | 514 | if content.control: |
|
509 | 515 | self._control_socket = self._context.socket(zmq.XREQ) |
|
510 |
self._control_socket.setsockopt(zmq.IDENTITY, |
|
|
516 | self._control_socket.setsockopt(zmq.IDENTITY, ident) | |
|
511 | 517 | connect_socket(self._control_socket, content.control) |
|
512 | 518 | if content.iopub: |
|
513 | 519 | self._iopub_socket = self._context.socket(zmq.SUB) |
|
514 | 520 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') |
|
515 |
self._iopub_socket.setsockopt(zmq.IDENTITY, |
|
|
521 | self._iopub_socket.setsockopt(zmq.IDENTITY, ident) | |
|
516 | 522 | connect_socket(self._iopub_socket, content.iopub) |
|
517 | 523 | self._update_engines(dict(content.engines)) |
|
518 | 524 | else: |
@@ -13,8 +13,6 Authors: | |||
|
13 | 13 | |
|
14 | 14 | """ |
|
15 | 15 | |
|
16 | __docformat__ = "restructuredtext en" | |
|
17 | ||
|
18 | 16 | #------------------------------------------------------------------------------- |
|
19 | 17 | # Copyright (C) 2008-2011 The IPython Development Team |
|
20 | 18 | # |
@@ -26,6 +24,8 __docformat__ = "restructuredtext en" | |||
|
26 | 24 | # Imports |
|
27 | 25 | #------------------------------------------------------------------------------- |
|
28 | 26 | |
|
27 | from __future__ import division | |
|
28 | ||
|
29 | 29 | import types |
|
30 | 30 | |
|
31 | 31 | from IPython.utils.data import flatten as utils_flatten |
@@ -67,7 +67,7 class Map: | |||
|
67 | 67 | return |
|
68 | 68 | |
|
69 | 69 | remainder = len(seq)%q |
|
70 | basesize = len(seq)/q | |
|
70 | basesize = len(seq)//q | |
|
71 | 71 | hi = [] |
|
72 | 72 | lo = [] |
|
73 | 73 | for n in range(q): |
@@ -16,6 +16,8 Authors: | |||
|
16 | 16 | # Imports |
|
17 | 17 | #----------------------------------------------------------------------------- |
|
18 | 18 | |
|
19 | from __future__ import division | |
|
20 | ||
|
19 | 21 | import warnings |
|
20 | 22 | |
|
21 | 23 | from IPython.testing.skipdoctest import skip_doctest |
@@ -142,7 +144,7 class ParallelFunction(RemoteFunction): | |||
|
142 | 144 | balanced = 'Balanced' in self.view.__class__.__name__ |
|
143 | 145 | if balanced: |
|
144 | 146 | if self.chunksize: |
|
145 | nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0) | |
|
147 | nparts = len_0//self.chunksize + int(len_0%self.chunksize > 0) | |
|
146 | 148 | else: |
|
147 | 149 | nparts = len_0 |
|
148 | 150 | targets = [None]*nparts |
@@ -169,7 +171,10 class ParallelFunction(RemoteFunction): | |||
|
169 | 171 | |
|
170 | 172 | # print (args) |
|
171 | 173 | if hasattr(self, '_map'): |
|
172 | f = map | |
|
174 | if sys.version_info[0] >= 3: | |
|
175 | f = lambda f, *sequences: list(map(f, *sequences)) | |
|
176 | else: | |
|
177 | f = map | |
|
173 | 178 | args = [self.func]+args |
|
174 | 179 | else: |
|
175 | 180 | f=self.func |
@@ -969,6 +969,8 class LoadBalancedView(View): | |||
|
969 | 969 | idents = [] |
|
970 | 970 | else: |
|
971 | 971 | idents = self.client._build_targets(targets)[0] |
|
972 | # ensure *not* bytes | |
|
973 | idents = [ ident.decode() for ident in idents ] | |
|
972 | 974 | |
|
973 | 975 | after = self._render_dependency(after) |
|
974 | 976 | follow = self._render_dependency(follow) |
@@ -25,6 +25,8 from zmq.eventloop import ioloop, zmqstream | |||
|
25 | 25 | from IPython.config.configurable import LoggingConfigurable |
|
26 | 26 | from IPython.utils.traitlets import Set, Instance, CFloat |
|
27 | 27 | |
|
28 | from IPython.parallel.util import ensure_bytes | |
|
29 | ||
|
28 | 30 | class Heart(object): |
|
29 | 31 | """A basic heart object for responding to a HeartMonitor. |
|
30 | 32 | This is a simple wrapper with defaults for the most common |
@@ -42,9 +44,9 class Heart(object): | |||
|
42 | 44 | self.device.connect_in(in_addr) |
|
43 | 45 | self.device.connect_out(out_addr) |
|
44 | 46 | if in_type == zmq.SUB: |
|
45 | self.device.setsockopt_in(zmq.SUBSCRIBE, "") | |
|
47 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") | |
|
46 | 48 | if heart_id is None: |
|
47 |
heart_id = |
|
|
49 | heart_id = ensure_bytes(uuid.uuid4()) | |
|
48 | 50 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) |
|
49 | 51 | self.id = heart_id |
|
50 | 52 | |
@@ -115,7 +117,7 class HeartMonitor(LoggingConfigurable): | |||
|
115 | 117 | self.responses = set() |
|
116 | 118 | # print self.on_probation, self.hearts |
|
117 | 119 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) |
|
118 | self.pingstream.send(str(self.lifetime)) | |
|
120 | self.pingstream.send(ensure_bytes(str(self.lifetime))) | |
|
119 | 121 | |
|
120 | 122 | def handle_new_heart(self, heart): |
|
121 | 123 | if self._new_handlers: |
@@ -140,11 +142,13 class HeartMonitor(LoggingConfigurable): | |||
|
140 | 142 | |
|
141 | 143 | def handle_pong(self, msg): |
|
142 | 144 | "a heart just beat" |
|
143 |
|
|
|
145 | current = ensure_bytes(str(self.lifetime)) | |
|
146 | last = ensure_bytes(str(self.last_ping)) | |
|
147 | if msg[1] == current: | |
|
144 | 148 | delta = time.time()-self.tic |
|
145 | 149 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
|
146 | 150 | self.responses.add(msg[0]) |
|
147 |
elif msg[1] == |
|
|
151 | elif msg[1] == last: | |
|
148 | 152 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) |
|
149 | 153 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) |
|
150 | 154 | self.responses.add(msg[0]) |
@@ -289,7 +289,7 class HubFactory(RegistrationFactory): | |||
|
289 | 289 | # resubmit stream |
|
290 | 290 | r = ZMQStream(ctx.socket(zmq.XREQ), loop) |
|
291 | 291 | url = util.disambiguate_url(self.client_info['task'][-1]) |
|
292 | r.setsockopt(zmq.IDENTITY, self.session.session) | |
|
292 | r.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session)) | |
|
293 | 293 | r.connect(url) |
|
294 | 294 | |
|
295 | 295 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
@@ -380,14 +380,14 class Hub(SessionFactory): | |||
|
380 | 380 | self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) |
|
381 | 381 | self.heartmonitor.add_new_heart_handler(self.handle_new_heart) |
|
382 | 382 | |
|
383 |
self.monitor_handlers = { |
|
|
384 | 'out': self.save_queue_result, | |
|
385 | 'intask': self.save_task_request, | |
|
386 | 'outtask': self.save_task_result, | |
|
387 | 'tracktask': self.save_task_destination, | |
|
388 | 'incontrol': _passer, | |
|
389 | 'outcontrol': _passer, | |
|
390 | 'iopub': self.save_iopub_message, | |
|
383 | self.monitor_handlers = {b'in' : self.save_queue_request, | |
|
384 | b'out': self.save_queue_result, | |
|
385 | b'intask': self.save_task_request, | |
|
386 | b'outtask': self.save_task_result, | |
|
387 | b'tracktask': self.save_task_destination, | |
|
388 | b'incontrol': _passer, | |
|
389 | b'outcontrol': _passer, | |
|
390 | b'iopub': self.save_iopub_message, | |
|
391 | 391 | } |
|
392 | 392 | |
|
393 | 393 | self.query_handlers = {'queue_request': self.queue_status, |
@@ -562,8 +562,9 class Hub(SessionFactory): | |||
|
562 | 562 | return |
|
563 | 563 | record = init_record(msg) |
|
564 | 564 | msg_id = record['msg_id'] |
|
565 | record['engine_uuid'] = queue_id | |
|
566 | record['client_uuid'] = client_id | |
|
565 | # Unicode in records | |
|
566 | record['engine_uuid'] = queue_id.decode('utf8', 'replace') | |
|
567 | record['client_uuid'] = client_id.decode('utf8', 'replace') | |
|
567 | 568 | record['queue'] = 'mux' |
|
568 | 569 | |
|
569 | 570 | try: |
@@ -751,7 +752,7 class Hub(SessionFactory): | |||
|
751 | 752 | # print (content) |
|
752 | 753 | msg_id = content['msg_id'] |
|
753 | 754 | engine_uuid = content['engine_id'] |
|
754 | eid = self.by_ident[engine_uuid] | |
|
755 | eid = self.by_ident[util.ensure_bytes(engine_uuid)] | |
|
755 | 756 | |
|
756 | 757 | self.log.info("task::task %r arrived on %r"%(msg_id, eid)) |
|
757 | 758 | if msg_id in self.unassigned: |
@@ -833,7 +834,7 class Hub(SessionFactory): | |||
|
833 | 834 | jsonable = {} |
|
834 | 835 | for k,v in self.keytable.iteritems(): |
|
835 | 836 | if v not in self.dead_engines: |
|
836 | jsonable[str(k)] = v | |
|
837 | jsonable[str(k)] = v.decode() | |
|
837 | 838 | content['engines'] = jsonable |
|
838 | 839 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) |
|
839 | 840 | |
@@ -841,11 +842,13 class Hub(SessionFactory): | |||
|
841 | 842 | """Register a new engine.""" |
|
842 | 843 | content = msg['content'] |
|
843 | 844 | try: |
|
844 | queue = content['queue'] | |
|
845 | queue = util.ensure_bytes(content['queue']) | |
|
845 | 846 | except KeyError: |
|
846 | 847 | self.log.error("registration::queue not specified", exc_info=True) |
|
847 | 848 | return |
|
848 | 849 | heart = content.get('heartbeat', None) |
|
850 | if heart: | |
|
851 | heart = util.ensure_bytes(heart) | |
|
849 | 852 | """register a new engine, and create the socket(s) necessary""" |
|
850 | 853 | eid = self._next_id |
|
851 | 854 | # print (eid, queue, reg, heart) |
@@ -912,7 +915,7 class Hub(SessionFactory): | |||
|
912 | 915 | self.log.info("registration::unregister_engine(%r)"%eid) |
|
913 | 916 | # print (eid) |
|
914 | 917 | uuid = self.keytable[eid] |
|
915 | content=dict(id=eid, queue=uuid) | |
|
918 | content=dict(id=eid, queue=uuid.decode()) | |
|
916 | 919 | self.dead_engines.add(uuid) |
|
917 | 920 | # self.ids.remove(eid) |
|
918 | 921 | # uuid = self.keytable.pop(eid) |
@@ -980,7 +983,7 class Hub(SessionFactory): | |||
|
980 | 983 | self.tasks[eid] = list() |
|
981 | 984 | self.completed[eid] = list() |
|
982 | 985 | self.hearts[heart] = eid |
|
983 | content = dict(id=eid, queue=self.engines[eid].queue) | |
|
986 | content = dict(id=eid, queue=self.engines[eid].queue.decode()) | |
|
984 | 987 | if self.notifier: |
|
985 | 988 | self.session.send(self.notifier, "registration_notification", content=content) |
|
986 | 989 | self.log.info("engine::Engine Connected: %i"%eid) |
@@ -1054,9 +1057,9 class Hub(SessionFactory): | |||
|
1054 | 1057 | queue = len(queue) |
|
1055 | 1058 | completed = len(completed) |
|
1056 | 1059 | tasks = len(tasks) |
|
1057 |
content[ |
|
|
1060 | content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} | |
|
1058 | 1061 | content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned) |
|
1059 | ||
|
1062 | # print (content) | |
|
1060 | 1063 | self.session.send(self.query, "queue_reply", content=content, ident=client_id) |
|
1061 | 1064 | |
|
1062 | 1065 | def purge_results(self, client_id, msg): |
@@ -1179,7 +1182,7 class Hub(SessionFactory): | |||
|
1179 | 1182 | 'io' : io_dict, |
|
1180 | 1183 | } |
|
1181 | 1184 | if rec['result_buffers']: |
|
1182 |
buffers = map( |
|
|
1185 | buffers = map(bytes, rec['result_buffers']) | |
|
1183 | 1186 | else: |
|
1184 | 1187 | buffers = [] |
|
1185 | 1188 | |
@@ -1281,7 +1284,7 class Hub(SessionFactory): | |||
|
1281 | 1284 | buffers.extend(rb) |
|
1282 | 1285 | content = dict(status='ok', records=records, buffer_lens=buffer_lens, |
|
1283 | 1286 | result_buffer_lens=result_buffer_lens) |
|
1284 | ||
|
1287 | # self.log.debug (content) | |
|
1285 | 1288 | self.session.send(self.query, "db_reply", content=content, |
|
1286 | 1289 | parent=msg, ident=client_id, |
|
1287 | 1290 | buffers=buffers) |
@@ -40,11 +40,11 from zmq.eventloop import ioloop, zmqstream | |||
|
40 | 40 | from IPython.external.decorator import decorator |
|
41 | 41 | from IPython.config.application import Application |
|
42 | 42 | from IPython.config.loader import Config |
|
43 | from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum | |
|
43 | from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes | |
|
44 | 44 | |
|
45 | 45 | from IPython.parallel import error |
|
46 | 46 | from IPython.parallel.factory import SessionFactory |
|
47 | from IPython.parallel.util import connect_logger, local_logger | |
|
47 | from IPython.parallel.util import connect_logger, local_logger, ensure_bytes | |
|
48 | 48 | |
|
49 | 49 | from .dependency import Dependency |
|
50 | 50 | |
@@ -174,6 +174,10 class TaskScheduler(SessionFactory): | |||
|
174 | 174 | blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency |
|
175 | 175 | auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') |
|
176 | 176 | |
|
177 | ident = CBytes() # ZMQ identity. This should just be self.session.session | |
|
178 | # but ensure Bytes | |
|
179 | def _ident_default(self): | |
|
180 | return ensure_bytes(self.session.session) | |
|
177 | 181 | |
|
178 | 182 | def start(self): |
|
179 | 183 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
@@ -204,7 +208,7 class TaskScheduler(SessionFactory): | |||
|
204 | 208 | try: |
|
205 | 209 | idents,msg = self.session.feed_identities(msg) |
|
206 | 210 | except ValueError: |
|
207 |
self.log.warn("task::Invalid Message: %r" |
|
|
211 | self.log.warn("task::Invalid Message: %r",msg) | |
|
208 | 212 | return |
|
209 | 213 | try: |
|
210 | 214 | msg = self.session.unpack_message(msg) |
@@ -219,15 +223,16 class TaskScheduler(SessionFactory): | |||
|
219 | 223 | self.log.error("Unhandled message type: %r"%msg_type) |
|
220 | 224 | else: |
|
221 | 225 | try: |
|
222 |
handler( |
|
|
223 |
except |
|
|
224 |
self.log.error("task::Invalid notification msg: %r" |
|
|
226 | handler(ensure_bytes(msg['content']['queue'])) | |
|
227 | except Exception: | |
|
228 | self.log.error("task::Invalid notification msg: %r",msg) | |
|
225 | 229 | |
|
226 | 230 | def _register_engine(self, uid): |
|
227 | 231 | """New engine with ident `uid` became available.""" |
|
228 | 232 | # head of the line: |
|
229 | 233 | self.targets.insert(0,uid) |
|
230 | 234 | self.loads.insert(0,0) |
|
235 | ||
|
231 | 236 | # initialize sets |
|
232 | 237 | self.completed[uid] = set() |
|
233 | 238 | self.failed[uid] = set() |
@@ -309,14 +314,18 class TaskScheduler(SessionFactory): | |||
|
309 | 314 | |
|
310 | 315 | |
|
311 | 316 | # send to monitor |
|
312 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) | |
|
317 | self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False) | |
|
313 | 318 | |
|
314 | 319 | header = msg['header'] |
|
315 | 320 | msg_id = header['msg_id'] |
|
316 | 321 | self.all_ids.add(msg_id) |
|
317 | 322 | |
|
318 | # targets | |
|
319 | targets = set(header.get('targets', [])) | |
|
323 | # get targets as a set of bytes objects | |
|
324 | # from a list of unicode objects | |
|
325 | targets = header.get('targets', []) | |
|
326 | targets = map(ensure_bytes, targets) | |
|
327 | targets = set(targets) | |
|
328 | ||
|
320 | 329 | retries = header.get('retries', 0) |
|
321 | 330 | self.retries[msg_id] = retries |
|
322 | 331 | |
@@ -412,7 +421,7 class TaskScheduler(SessionFactory): | |||
|
412 | 421 | |
|
413 | 422 | msg = self.session.send(self.client_stream, 'apply_reply', content, |
|
414 | 423 | parent=header, ident=idents) |
|
415 | self.session.send(self.mon_stream, msg, ident=['outtask']+idents) | |
|
424 | self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents) | |
|
416 | 425 | |
|
417 | 426 | self.update_graph(msg_id, success=False) |
|
418 | 427 | |
@@ -494,9 +503,9 class TaskScheduler(SessionFactory): | |||
|
494 | 503 | self.add_job(idx) |
|
495 | 504 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) |
|
496 | 505 | # notify Hub |
|
497 | content = dict(msg_id=msg_id, engine_id=target) | |
|
506 | content = dict(msg_id=msg_id, engine_id=target.decode()) | |
|
498 | 507 | self.session.send(self.mon_stream, 'task_destination', content=content, |
|
499 |
ident=['tracktask',self. |
|
|
508 | ident=[b'tracktask',self.ident]) | |
|
500 | 509 | |
|
501 | 510 | |
|
502 | 511 | #----------------------------------------------------------------------- |
@@ -533,7 +542,7 class TaskScheduler(SessionFactory): | |||
|
533 | 542 | # relay to client and update graph |
|
534 | 543 | self.handle_result(idents, parent, raw_msg, success) |
|
535 | 544 | # send to Hub monitor |
|
536 | self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False) | |
|
545 | self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False) | |
|
537 | 546 | else: |
|
538 | 547 | self.handle_unmet_dependency(idents, parent) |
|
539 | 548 |
@@ -28,6 +28,12 from IPython.utils.jsonutil import date_default, extract_dates, squash_dates | |||
|
28 | 28 | # SQLite operators, adapters, and converters |
|
29 | 29 | #----------------------------------------------------------------------------- |
|
30 | 30 | |
|
31 | try: | |
|
32 | buffer | |
|
33 | except NameError: | |
|
34 | # py3k | |
|
35 | buffer = memoryview | |
|
36 | ||
|
31 | 37 | operators = { |
|
32 | 38 | '$lt' : "<", |
|
33 | 39 | '$gt' : ">", |
@@ -54,7 +60,11 def _convert_dict(ds): | |||
|
54 | 60 | if ds is None: |
|
55 | 61 | return ds |
|
56 | 62 | else: |
|
57 | return extract_dates(json.loads(ds)) | |
|
63 | if isinstance(ds, bytes): | |
|
64 | # If I understand the sqlite doc correctly, this will always be utf8 | |
|
65 | ds = ds.decode('utf8') | |
|
66 | d = json.loads(ds) | |
|
67 | return extract_dates(d) | |
|
58 | 68 | |
|
59 | 69 | def _adapt_bufs(bufs): |
|
60 | 70 | # this is *horrible* |
@@ -28,7 +28,7 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode | |||
|
28 | 28 | |
|
29 | 29 | from IPython.parallel.controller.heartmonitor import Heart |
|
30 | 30 | from IPython.parallel.factory import RegistrationFactory |
|
31 | from IPython.parallel.util import disambiguate_url | |
|
31 | from IPython.parallel.util import disambiguate_url, ensure_bytes | |
|
32 | 32 | |
|
33 | 33 | from IPython.zmq.session import Message |
|
34 | 34 | |
@@ -65,7 +65,7 class EngineFactory(RegistrationFactory): | |||
|
65 | 65 | ctx = self.context |
|
66 | 66 | |
|
67 | 67 | reg = ctx.socket(zmq.XREQ) |
|
68 | reg.setsockopt(zmq.IDENTITY, self.ident) | |
|
68 | reg.setsockopt(zmq.IDENTITY, ensure_bytes(self.ident)) | |
|
69 | 69 | reg.connect(self.url) |
|
70 | 70 | self.registrar = zmqstream.ZMQStream(reg, self.loop) |
|
71 | 71 | |
@@ -83,7 +83,7 class EngineFactory(RegistrationFactory): | |||
|
83 | 83 | self._abort_dc.stop() |
|
84 | 84 | ctx = self.context |
|
85 | 85 | loop = self.loop |
|
86 | identity = self.ident | |
|
86 | identity = ensure_bytes(self.ident) | |
|
87 | 87 | |
|
88 | 88 | idents,msg = self.session.feed_identities(msg) |
|
89 | 89 | msg = Message(self.session.unpack_message(msg)) |
@@ -35,12 +35,12 import zmq | |||
|
35 | 35 | from zmq.eventloop import ioloop, zmqstream |
|
36 | 36 | |
|
37 | 37 | # Local imports. |
|
38 | from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode | |
|
38 | from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes | |
|
39 | 39 | from IPython.zmq.completer import KernelCompleter |
|
40 | 40 | |
|
41 | 41 | from IPython.parallel.error import wrap_exception |
|
42 | 42 | from IPython.parallel.factory import SessionFactory |
|
43 | from IPython.parallel.util import serialize_object, unpack_apply_message | |
|
43 | from IPython.parallel.util import serialize_object, unpack_apply_message, ensure_bytes | |
|
44 | 44 | |
|
45 | 45 | def printer(*args): |
|
46 | 46 | pprint(args, stream=sys.__stdout__) |
@@ -73,8 +73,14 class Kernel(SessionFactory): | |||
|
73 | 73 | # kwargs: |
|
74 | 74 | exec_lines = List(Unicode, config=True, |
|
75 | 75 | help="List of lines to execute") |
|
76 | ||
|
76 | ||
|
77 | # identities: | |
|
77 | 78 | int_id = Int(-1) |
|
79 | bident = CBytes() | |
|
80 | ident = Unicode() | |
|
81 | def _ident_changed(self, name, old, new): | |
|
82 | self.bident = ensure_bytes(new) | |
|
83 | ||
|
78 | 84 | user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""") |
|
79 | 85 | |
|
80 | 86 | control_stream = Instance(zmqstream.ZMQStream) |
@@ -193,6 +199,8 class Kernel(SessionFactory): | |||
|
193 | 199 | except: |
|
194 | 200 | self.log.error("Invalid Message", exc_info=True) |
|
195 | 201 | return |
|
202 | else: | |
|
203 | self.log.debug("Control received, %s", msg) | |
|
196 | 204 | |
|
197 | 205 | header = msg['header'] |
|
198 | 206 | msg_id = header['msg_id'] |
@@ -247,7 +255,7 class Kernel(SessionFactory): | |||
|
247 | 255 | self.log.error("Got bad msg: %s"%parent, exc_info=True) |
|
248 | 256 | return |
|
249 | 257 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, |
|
250 | ident='%s.pyin'%self.prefix) | |
|
258 | ident=ensure_bytes('%s.pyin'%self.prefix)) | |
|
251 | 259 | started = datetime.now() |
|
252 | 260 | try: |
|
253 | 261 | comp_code = self.compiler(code, '<zmq-kernel>') |
@@ -261,7 +269,7 class Kernel(SessionFactory): | |||
|
261 | 269 | exc_content = self._wrap_exception('execute') |
|
262 | 270 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
263 | 271 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, |
|
264 | ident='%s.pyerr'%self.prefix) | |
|
272 | ident=ensure_bytes('%s.pyerr'%self.prefix)) | |
|
265 | 273 | reply_content = exc_content |
|
266 | 274 | else: |
|
267 | 275 | reply_content = {'status' : 'ok'} |
@@ -285,7 +293,6 class Kernel(SessionFactory): | |||
|
285 | 293 | def apply_request(self, stream, ident, parent): |
|
286 | 294 | # flush previous reply, so this request won't block it |
|
287 | 295 | stream.flush(zmq.POLLOUT) |
|
288 | ||
|
289 | 296 | try: |
|
290 | 297 | content = parent[u'content'] |
|
291 | 298 | bufs = parent[u'buffers'] |
@@ -341,7 +348,7 class Kernel(SessionFactory): | |||
|
341 | 348 | exc_content = self._wrap_exception('apply') |
|
342 | 349 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
343 | 350 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, |
|
344 | ident='%s.pyerr'%self.prefix) | |
|
351 | ident=ensure_bytes('%s.pyerr'%self.prefix)) | |
|
345 | 352 | reply_content = exc_content |
|
346 | 353 | result_buf = [] |
|
347 | 354 | |
@@ -370,6 +377,8 class Kernel(SessionFactory): | |||
|
370 | 377 | except: |
|
371 | 378 | self.log.error("Invalid Message", exc_info=True) |
|
372 | 379 | return |
|
380 | else: | |
|
381 | self.log.debug("Message received, %s", msg) | |
|
373 | 382 | |
|
374 | 383 | |
|
375 | 384 | header = msg['header'] |
@@ -41,9 +41,11 def crash(): | |||
|
41 | 41 | if sys.platform.startswith('win'): |
|
42 | 42 | import ctypes |
|
43 | 43 | ctypes.windll.kernel32.SetErrorMode(0x0002); |
|
44 | ||
|
45 | co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00', | |
|
46 | (), (), (), '', '', 1, b'') | |
|
44 | args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b''] | |
|
45 | if sys.version_info[0] >= 3: | |
|
46 | args.insert(1, 0) | |
|
47 | ||
|
48 | co = types.CodeType(*args) | |
|
47 | 49 | exec(co) |
|
48 | 50 | |
|
49 | 51 | def wait(n): |
@@ -57,7 +57,7 class AsyncResultTest(ClusterTestCase): | |||
|
57 | 57 | |
|
58 | 58 | def test_get_after_error(self): |
|
59 | 59 | ar = self.client[-1].apply_async(lambda : 1/0) |
|
60 | ar.wait() | |
|
60 | ar.wait(10) | |
|
61 | 61 | self.assertRaisesRemote(ZeroDivisionError, ar.get) |
|
62 | 62 | self.assertRaisesRemote(ZeroDivisionError, ar.get) |
|
63 | 63 | self.assertRaisesRemote(ZeroDivisionError, ar.get_dict) |
@@ -16,6 +16,8 Authors: | |||
|
16 | 16 | # Imports |
|
17 | 17 | #------------------------------------------------------------------------------- |
|
18 | 18 | |
|
19 | from __future__ import division | |
|
20 | ||
|
19 | 21 | import time |
|
20 | 22 | from datetime import datetime |
|
21 | 23 | from tempfile import mktemp |
@@ -132,7 +134,9 class TestClient(ClusterTestCase): | |||
|
132 | 134 | self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) |
|
133 | 135 | allqs = self.client.queue_status() |
|
134 | 136 | self.assertTrue(isinstance(allqs, dict)) |
|
135 | self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned'])) | |
|
137 | intkeys = list(allqs.keys()) | |
|
138 | intkeys.remove('unassigned') | |
|
139 | self.assertEquals(sorted(intkeys), sorted(self.client.ids)) | |
|
136 | 140 | unassigned = allqs.pop('unassigned') |
|
137 | 141 | for eid,qs in allqs.items(): |
|
138 | 142 | self.assertTrue(isinstance(qs, dict)) |
@@ -156,7 +160,7 class TestClient(ClusterTestCase): | |||
|
156 | 160 | def test_db_query_dt(self): |
|
157 | 161 | """test db query by date""" |
|
158 | 162 | hist = self.client.hub_history() |
|
159 | middle = self.client.db_query({'msg_id' : hist[len(hist)/2]})[0] | |
|
163 | middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0] | |
|
160 | 164 | tic = middle['submitted'] |
|
161 | 165 | before = self.client.db_query({'submitted' : {'$lt' : tic}}) |
|
162 | 166 | after = self.client.db_query({'submitted' : {'$gte' : tic}}) |
@@ -16,6 +16,7 Authors: | |||
|
16 | 16 | # Imports |
|
17 | 17 | #------------------------------------------------------------------------------- |
|
18 | 18 | |
|
19 | from __future__ import division | |
|
19 | 20 | |
|
20 | 21 | import tempfile |
|
21 | 22 | import time |
@@ -100,7 +101,7 class TestDictBackend(TestCase): | |||
|
100 | 101 | def test_find_records_dt(self): |
|
101 | 102 | """test finding records by date""" |
|
102 | 103 | hist = self.db.get_history() |
|
103 | middle = self.db.get_record(hist[len(hist)/2]) | |
|
104 | middle = self.db.get_record(hist[len(hist)//2]) | |
|
104 | 105 | tic = middle['submitted'] |
|
105 | 106 | before = self.db.find_records({'submitted' : {'$lt' : tic}}) |
|
106 | 107 | after = self.db.find_records({'submitted' : {'$gte' : tic}}) |
@@ -168,7 +169,7 class TestDictBackend(TestCase): | |||
|
168 | 169 | query = {'msg_id' : {'$in':msg_ids}} |
|
169 | 170 | self.db.drop_matching_records(query) |
|
170 | 171 | recs = self.db.find_records(query) |
|
171 |
self.assert |
|
|
172 | self.assertEquals(len(recs), 0) | |
|
172 | 173 | |
|
173 | 174 | class TestSQLiteBackend(TestDictBackend): |
|
174 | 175 | def create_db(self): |
@@ -43,7 +43,7 class TestView(ClusterTestCase): | |||
|
43 | 43 | # self.add_engines(1) |
|
44 | 44 | eid = self.client.ids[-1] |
|
45 | 45 | ar = self.client[eid].apply_async(crash) |
|
46 | self.assertRaisesRemote(error.EngineError, ar.get) | |
|
46 | self.assertRaisesRemote(error.EngineError, ar.get, 10) | |
|
47 | 47 | eid = ar.engine_id |
|
48 | 48 | tic = time.time() |
|
49 | 49 | while eid in self.client.ids and time.time()-tic < 5: |
@@ -413,7 +413,10 class TestView(ClusterTestCase): | |||
|
413 | 413 | """test executing unicode strings""" |
|
414 | 414 | v = self.client[-1] |
|
415 | 415 | v.block=True |
|
416 | code=u"a=u'é'" | |
|
416 | if sys.version_info[0] >= 3: | |
|
417 | code="a='é'" | |
|
418 | else: | |
|
419 | code=u"a=u'é'" | |
|
417 | 420 | v.execute(code) |
|
418 | 421 | self.assertEquals(v['a'], u'é') |
|
419 | 422 | |
@@ -433,7 +436,7 class TestView(ClusterTestCase): | |||
|
433 | 436 | assert isinstance(check, bytes), "%r is not bytes"%check |
|
434 | 437 | assert a.encode('utf8') == check, "%s != %s"%(a,check) |
|
435 | 438 | |
|
436 |
for s in [ u'é', u'ßø®∫','asdf' |
|
|
439 | for s in [ u'é', u'ßø®∫',u'asdf' ]: | |
|
437 | 440 | try: |
|
438 | 441 | v.apply_sync(check_unicode, s, s.encode('utf8')) |
|
439 | 442 | except error.RemoteError as e: |
@@ -101,6 +101,12 class ReverseDict(dict): | |||
|
101 | 101 | # Functions |
|
102 | 102 | #----------------------------------------------------------------------------- |
|
103 | 103 | |
|
104 | def ensure_bytes(s): | |
|
105 | """ensure that an object is bytes""" | |
|
106 | if isinstance(s, unicode): | |
|
107 | s = s.encode(sys.getdefaultencoding(), 'replace') | |
|
108 | return s | |
|
109 | ||
|
104 | 110 | def validate_url(url): |
|
105 | 111 | """validate a url for zeromq""" |
|
106 | 112 | if not isinstance(url, basestring): |
@@ -23,6 +23,7 __docformat__ = "restructuredtext en" | |||
|
23 | 23 | # Imports |
|
24 | 24 | #------------------------------------------------------------------------------- |
|
25 | 25 | |
|
26 | import sys | |
|
26 | 27 | import new, types, copy_reg |
|
27 | 28 | |
|
28 | 29 | def code_ctor(*args): |
@@ -31,9 +32,12 def code_ctor(*args): | |||
|
31 | 32 | def reduce_code(co): |
|
32 | 33 | if co.co_freevars or co.co_cellvars: |
|
33 | 34 | raise ValueError("Sorry, cannot pickle code objects with closures") |
|
34 |
|
|
|
35 | co.co_flags, co.co_code, co.co_consts, co.co_names, | |
|
36 | co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, | |
|
37 |
co.co_lnotab |
|
|
35 | args = [co.co_argcount, co.co_nlocals, co.co_stacksize, | |
|
36 | co.co_flags, co.co_code, co.co_consts, co.co_names, | |
|
37 | co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, | |
|
38 | co.co_lnotab] | |
|
39 | if sys.version_info[0] >= 3: | |
|
40 | args.insert(1, co.co_kwonlyargcount) | |
|
41 | return code_ctor, tuple(args) | |
|
38 | 42 | |
|
39 | 43 | copy_reg.pickle(types.CodeType, reduce_code) No newline at end of file |
@@ -19,16 +19,23 __test__ = {} | |||
|
19 | 19 | # Imports |
|
20 | 20 | #------------------------------------------------------------------------------- |
|
21 | 21 | |
|
22 | import sys | |
|
22 | 23 | import cPickle as pickle |
|
23 | 24 | |
|
24 | 25 | try: |
|
25 | 26 | import numpy |
|
26 | 27 | except ImportError: |
|
27 | pass | |
|
28 | numpy = None | |
|
28 | 29 | |
|
29 | 30 | class SerializationError(Exception): |
|
30 | 31 | pass |
|
31 | 32 | |
|
33 | if sys.version_info[0] >= 3: | |
|
34 | buffer = memoryview | |
|
35 | py3k = True | |
|
36 | else: | |
|
37 | py3k = False | |
|
38 | ||
|
32 | 39 | #----------------------------------------------------------------------------- |
|
33 | 40 | # Classes and functions |
|
34 | 41 | #----------------------------------------------------------------------------- |
@@ -93,8 +100,11 class SerializeIt(object): | |||
|
93 | 100 | def __init__(self, unSerialized): |
|
94 | 101 | self.data = None |
|
95 | 102 | self.obj = unSerialized.getObject() |
|
96 |
if |
|
|
97 | if len(self.obj.shape) == 0: # length 0 arrays are just pickled | |
|
103 | if numpy is not None and isinstance(self.obj, numpy.ndarray): | |
|
104 | if py3k or len(self.obj.shape) == 0: # length 0 arrays are just pickled | |
|
105 | # FIXME: | |
|
106 | # also use pickle for numpy arrays on py3k, since | |
|
107 | # pyzmq doesn't rebuild from memoryviews properly | |
|
98 | 108 | self.typeDescriptor = 'pickle' |
|
99 | 109 | self.metadata = {} |
|
100 | 110 | else: |
@@ -102,7 +112,7 class SerializeIt(object): | |||
|
102 | 112 | self.typeDescriptor = 'ndarray' |
|
103 | 113 | self.metadata = {'shape':self.obj.shape, |
|
104 | 114 | 'dtype':self.obj.dtype.str} |
|
105 |
elif isinstance(self.obj, |
|
|
115 | elif isinstance(self.obj, bytes): | |
|
106 | 116 | self.typeDescriptor = 'bytes' |
|
107 | 117 | self.metadata = {} |
|
108 | 118 | elif isinstance(self.obj, buffer): |
@@ -146,9 +156,9 class UnSerializeIt(UnSerialized): | |||
|
146 | 156 | |
|
147 | 157 | def getObject(self): |
|
148 | 158 | typeDescriptor = self.serialized.getTypeDescriptor() |
|
149 |
if |
|
|
159 | if numpy is not None and typeDescriptor == 'ndarray': | |
|
150 | 160 | buf = self.serialized.getData() |
|
151 |
if isinstance(buf, ( |
|
|
161 | if isinstance(buf, (bytes, buffer)): | |
|
152 | 162 | result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype']) |
|
153 | 163 | else: |
|
154 | 164 | # memoryview |
General Comments 0
You need to be logged in to leave comments.
Login now