Show More
@@ -54,7 +54,7 from IPython.parallel.controller.hub import HubFactory | |||||
54 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler |
|
54 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler | |
55 | from IPython.parallel.controller.sqlitedb import SQLiteDB |
|
55 | from IPython.parallel.controller.sqlitedb import SQLiteDB | |
56 |
|
56 | |||
57 | from IPython.parallel.util import signal_children, split_url |
|
57 | from IPython.parallel.util import signal_children, split_url, ensure_bytes | |
58 |
|
58 | |||
59 | # conditional import of MongoDB backend class |
|
59 | # conditional import of MongoDB backend class | |
60 |
|
60 | |||
@@ -202,14 +202,13 class IPControllerApp(BaseParallelApplication): | |||||
202 | # load from engine config |
|
202 | # load from engine config | |
203 | with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f: |
|
203 | with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f: | |
204 | cfg = json.loads(f.read()) |
|
204 | cfg = json.loads(f.read()) | |
205 | key = c.Session.key = cfg['exec_key'] |
|
205 | key = c.Session.key = ensure_bytes(cfg['exec_key']) | |
206 | xport,addr = cfg['url'].split('://') |
|
206 | xport,addr = cfg['url'].split('://') | |
207 | c.HubFactory.engine_transport = xport |
|
207 | c.HubFactory.engine_transport = xport | |
208 | ip,ports = addr.split(':') |
|
208 | ip,ports = addr.split(':') | |
209 | c.HubFactory.engine_ip = ip |
|
209 | c.HubFactory.engine_ip = ip | |
210 | c.HubFactory.regport = int(ports) |
|
210 | c.HubFactory.regport = int(ports) | |
211 | self.location = cfg['location'] |
|
211 | self.location = cfg['location'] | |
212 |
|
||||
213 | # load client config |
|
212 | # load client config | |
214 | with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f: |
|
213 | with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f: | |
215 | cfg = json.loads(f.read()) |
|
214 | cfg = json.loads(f.read()) | |
@@ -240,9 +239,9 class IPControllerApp(BaseParallelApplication): | |||||
240 | # with open(keyfile, 'w') as f: |
|
239 | # with open(keyfile, 'w') as f: | |
241 | # f.write(key) |
|
240 | # f.write(key) | |
242 | # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) |
|
241 | # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) | |
243 | c.Session.key = key |
|
242 | c.Session.key = ensure_bytes(key) | |
244 | else: |
|
243 | else: | |
245 | key = c.Session.key = '' |
|
244 | key = c.Session.key = b'' | |
246 |
|
245 | |||
247 | try: |
|
246 | try: | |
248 | self.factory = HubFactory(config=c, log=self.log) |
|
247 | self.factory = HubFactory(config=c, log=self.log) | |
@@ -273,27 +272,27 class IPControllerApp(BaseParallelApplication): | |||||
273 | hub = self.factory |
|
272 | hub = self.factory | |
274 | # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url |
|
273 | # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url | |
275 | # IOPub relay (in a Process) |
|
274 | # IOPub relay (in a Process) | |
276 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') |
|
275 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') | |
277 | q.bind_in(hub.client_info['iopub']) |
|
276 | q.bind_in(hub.client_info['iopub']) | |
278 | q.bind_out(hub.engine_info['iopub']) |
|
277 | q.bind_out(hub.engine_info['iopub']) | |
279 | q.setsockopt_out(zmq.SUBSCRIBE, '') |
|
278 | q.setsockopt_out(zmq.SUBSCRIBE, b'') | |
280 | q.connect_mon(hub.monitor_url) |
|
279 | q.connect_mon(hub.monitor_url) | |
281 | q.daemon=True |
|
280 | q.daemon=True | |
282 | children.append(q) |
|
281 | children.append(q) | |
283 |
|
282 | |||
284 | # Multiplexer Queue (in a Process) |
|
283 | # Multiplexer Queue (in a Process) | |
285 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') |
|
284 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out') | |
286 | q.bind_in(hub.client_info['mux']) |
|
285 | q.bind_in(hub.client_info['mux']) | |
287 | q.setsockopt_in(zmq.IDENTITY, 'mux') |
|
286 | q.setsockopt_in(zmq.IDENTITY, b'mux') | |
288 | q.bind_out(hub.engine_info['mux']) |
|
287 | q.bind_out(hub.engine_info['mux']) | |
289 | q.connect_mon(hub.monitor_url) |
|
288 | q.connect_mon(hub.monitor_url) | |
290 | q.daemon=True |
|
289 | q.daemon=True | |
291 | children.append(q) |
|
290 | children.append(q) | |
292 |
|
291 | |||
293 | # Control Queue (in a Process) |
|
292 | # Control Queue (in a Process) | |
294 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') |
|
293 | q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol') | |
295 | q.bind_in(hub.client_info['control']) |
|
294 | q.bind_in(hub.client_info['control']) | |
296 | q.setsockopt_in(zmq.IDENTITY, 'control') |
|
295 | q.setsockopt_in(zmq.IDENTITY, b'control') | |
297 | q.bind_out(hub.engine_info['control']) |
|
296 | q.bind_out(hub.engine_info['control']) | |
298 | q.connect_mon(hub.monitor_url) |
|
297 | q.connect_mon(hub.monitor_url) | |
299 | q.daemon=True |
|
298 | q.daemon=True | |
@@ -305,10 +304,10 class IPControllerApp(BaseParallelApplication): | |||||
305 | # Task Queue (in a Process) |
|
304 | # Task Queue (in a Process) | |
306 | if scheme == 'pure': |
|
305 | if scheme == 'pure': | |
307 | self.log.warn("task::using pure XREQ Task scheduler") |
|
306 | self.log.warn("task::using pure XREQ Task scheduler") | |
308 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') |
|
307 | q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask') | |
309 | # q.setsockopt_out(zmq.HWM, hub.hwm) |
|
308 | # q.setsockopt_out(zmq.HWM, hub.hwm) | |
310 | q.bind_in(hub.client_info['task'][1]) |
|
309 | q.bind_in(hub.client_info['task'][1]) | |
311 | q.setsockopt_in(zmq.IDENTITY, 'task') |
|
310 | q.setsockopt_in(zmq.IDENTITY, b'task') | |
312 | q.bind_out(hub.engine_info['task']) |
|
311 | q.bind_out(hub.engine_info['task']) | |
313 | q.connect_mon(hub.monitor_url) |
|
312 | q.connect_mon(hub.monitor_url) | |
314 | q.daemon=True |
|
313 | q.daemon=True |
@@ -41,7 +41,7 from IPython.config.configurable import Configurable | |||||
41 | from IPython.zmq.session import Session |
|
41 | from IPython.zmq.session import Session | |
42 | from IPython.parallel.engine.engine import EngineFactory |
|
42 | from IPython.parallel.engine.engine import EngineFactory | |
43 | from IPython.parallel.engine.streamkernel import Kernel |
|
43 | from IPython.parallel.engine.streamkernel import Kernel | |
44 | from IPython.parallel.util import disambiguate_url |
|
44 | from IPython.parallel.util import disambiguate_url, ensure_bytes | |
45 |
|
45 | |||
46 | from IPython.utils.importstring import import_item |
|
46 | from IPython.utils.importstring import import_item | |
47 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float |
|
47 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float | |
@@ -216,11 +216,8 class IPEngineApp(BaseParallelApplication): | |||||
216 | self.log.info("Loading url_file %r"%self.url_file) |
|
216 | self.log.info("Loading url_file %r"%self.url_file) | |
217 | with open(self.url_file) as f: |
|
217 | with open(self.url_file) as f: | |
218 | d = json.loads(f.read()) |
|
218 | d = json.loads(f.read()) | |
219 | for k,v in d.iteritems(): |
|
|||
220 | if isinstance(v, unicode): |
|
|||
221 | d[k] = v.encode() |
|
|||
222 | if d['exec_key']: |
|
219 | if d['exec_key']: | |
223 | config.Session.key = d['exec_key'] |
|
220 | config.Session.key = ensure_bytes(d['exec_key']) | |
224 | d['url'] = disambiguate_url(d['url'], d['location']) |
|
221 | d['url'] = disambiguate_url(d['url'], d['location']) | |
225 | config.EngineFactory.url = d['url'] |
|
222 | config.EngineFactory.url = d['url'] | |
226 | config.EngineFactory.location = d['location'] |
|
223 | config.EngineFactory.location = d['location'] |
@@ -17,6 +17,7 Authors: | |||||
17 |
|
17 | |||
18 | import os |
|
18 | import os | |
19 | import json |
|
19 | import json | |
|
20 | import sys | |||
20 | import time |
|
21 | import time | |
21 | import warnings |
|
22 | import warnings | |
22 | from datetime import datetime |
|
23 | from datetime import datetime | |
@@ -48,6 +49,11 from .asyncresult import AsyncResult, AsyncHubResult | |||||
48 | from IPython.core.profiledir import ProfileDir, ProfileDirError |
|
49 | from IPython.core.profiledir import ProfileDir, ProfileDirError | |
49 | from .view import DirectView, LoadBalancedView |
|
50 | from .view import DirectView, LoadBalancedView | |
50 |
|
51 | |||
|
52 | if sys.version_info[0] >= 3: | |||
|
53 | # xrange is used in a coupe 'isinstance' tests in py2 | |||
|
54 | # should be just 'range' in 3k | |||
|
55 | xrange = range | |||
|
56 | ||||
51 | #-------------------------------------------------------------------------- |
|
57 | #-------------------------------------------------------------------------- | |
52 | # Decorators for Client methods |
|
58 | # Decorators for Client methods | |
53 | #-------------------------------------------------------------------------- |
|
59 | #-------------------------------------------------------------------------- | |
@@ -356,13 +362,12 class Client(HasTraits): | |||||
356 | if os.path.isfile(exec_key): |
|
362 | if os.path.isfile(exec_key): | |
357 | extra_args['keyfile'] = exec_key |
|
363 | extra_args['keyfile'] = exec_key | |
358 | else: |
|
364 | else: | |
359 |
|
|
365 | exec_key = util.ensure_bytes(exec_key) | |
360 | exec_key = exec_key.encode('ascii') |
|
|||
361 | extra_args['key'] = exec_key |
|
366 | extra_args['key'] = exec_key | |
362 | self.session = Session(**extra_args) |
|
367 | self.session = Session(**extra_args) | |
363 |
|
368 | |||
364 | self._query_socket = self._context.socket(zmq.XREQ) |
|
369 | self._query_socket = self._context.socket(zmq.XREQ) | |
365 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
370 | self._query_socket.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session)) | |
366 | if self._ssh: |
|
371 | if self._ssh: | |
367 | tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) |
|
372 | tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) | |
368 | else: |
|
373 | else: | |
@@ -404,7 +409,7 class Client(HasTraits): | |||||
404 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" |
|
409 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" | |
405 | for k,v in engines.iteritems(): |
|
410 | for k,v in engines.iteritems(): | |
406 | eid = int(k) |
|
411 | eid = int(k) | |
407 |
self._engines[eid] = |
|
412 | self._engines[eid] = v | |
408 | self._ids.append(eid) |
|
413 | self._ids.append(eid) | |
409 | self._ids = sorted(self._ids) |
|
414 | self._ids = sorted(self._ids) | |
410 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ |
|
415 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ | |
@@ -455,7 +460,7 class Client(HasTraits): | |||||
455 | if not isinstance(targets, (tuple, list, xrange)): |
|
460 | if not isinstance(targets, (tuple, list, xrange)): | |
456 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) |
|
461 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) | |
457 |
|
462 | |||
458 | return [self._engines[t] for t in targets], list(targets) |
|
463 | return [util.ensure_bytes(self._engines[t]) for t in targets], list(targets) | |
459 |
|
464 | |||
460 | def _connect(self, sshserver, ssh_kwargs, timeout): |
|
465 | def _connect(self, sshserver, ssh_kwargs, timeout): | |
461 | """setup all our socket connections to the cluster. This is called from |
|
466 | """setup all our socket connections to the cluster. This is called from | |
@@ -488,14 +493,15 class Client(HasTraits): | |||||
488 | content = msg.content |
|
493 | content = msg.content | |
489 | self._config['registration'] = dict(content) |
|
494 | self._config['registration'] = dict(content) | |
490 | if content.status == 'ok': |
|
495 | if content.status == 'ok': | |
|
496 | ident = util.ensure_bytes(self.session.session) | |||
491 | if content.mux: |
|
497 | if content.mux: | |
492 | self._mux_socket = self._context.socket(zmq.XREQ) |
|
498 | self._mux_socket = self._context.socket(zmq.XREQ) | |
493 |
self._mux_socket.setsockopt(zmq.IDENTITY, |
|
499 | self._mux_socket.setsockopt(zmq.IDENTITY, ident) | |
494 | connect_socket(self._mux_socket, content.mux) |
|
500 | connect_socket(self._mux_socket, content.mux) | |
495 | if content.task: |
|
501 | if content.task: | |
496 | self._task_scheme, task_addr = content.task |
|
502 | self._task_scheme, task_addr = content.task | |
497 | self._task_socket = self._context.socket(zmq.XREQ) |
|
503 | self._task_socket = self._context.socket(zmq.XREQ) | |
498 |
self._task_socket.setsockopt(zmq.IDENTITY, |
|
504 | self._task_socket.setsockopt(zmq.IDENTITY, ident) | |
499 | connect_socket(self._task_socket, task_addr) |
|
505 | connect_socket(self._task_socket, task_addr) | |
500 | if content.notification: |
|
506 | if content.notification: | |
501 | self._notification_socket = self._context.socket(zmq.SUB) |
|
507 | self._notification_socket = self._context.socket(zmq.SUB) | |
@@ -507,12 +513,12 class Client(HasTraits): | |||||
507 | # connect_socket(self._query_socket, content.query) |
|
513 | # connect_socket(self._query_socket, content.query) | |
508 | if content.control: |
|
514 | if content.control: | |
509 | self._control_socket = self._context.socket(zmq.XREQ) |
|
515 | self._control_socket = self._context.socket(zmq.XREQ) | |
510 |
self._control_socket.setsockopt(zmq.IDENTITY, |
|
516 | self._control_socket.setsockopt(zmq.IDENTITY, ident) | |
511 | connect_socket(self._control_socket, content.control) |
|
517 | connect_socket(self._control_socket, content.control) | |
512 | if content.iopub: |
|
518 | if content.iopub: | |
513 | self._iopub_socket = self._context.socket(zmq.SUB) |
|
519 | self._iopub_socket = self._context.socket(zmq.SUB) | |
514 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') |
|
520 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') | |
515 |
self._iopub_socket.setsockopt(zmq.IDENTITY, |
|
521 | self._iopub_socket.setsockopt(zmq.IDENTITY, ident) | |
516 | connect_socket(self._iopub_socket, content.iopub) |
|
522 | connect_socket(self._iopub_socket, content.iopub) | |
517 | self._update_engines(dict(content.engines)) |
|
523 | self._update_engines(dict(content.engines)) | |
518 | else: |
|
524 | else: |
@@ -13,8 +13,6 Authors: | |||||
13 |
|
13 | |||
14 | """ |
|
14 | """ | |
15 |
|
15 | |||
16 | __docformat__ = "restructuredtext en" |
|
|||
17 |
|
||||
18 | #------------------------------------------------------------------------------- |
|
16 | #------------------------------------------------------------------------------- | |
19 | # Copyright (C) 2008-2011 The IPython Development Team |
|
17 | # Copyright (C) 2008-2011 The IPython Development Team | |
20 | # |
|
18 | # | |
@@ -26,6 +24,8 __docformat__ = "restructuredtext en" | |||||
26 | # Imports |
|
24 | # Imports | |
27 | #------------------------------------------------------------------------------- |
|
25 | #------------------------------------------------------------------------------- | |
28 |
|
26 | |||
|
27 | from __future__ import division | |||
|
28 | ||||
29 | import types |
|
29 | import types | |
30 |
|
30 | |||
31 | from IPython.utils.data import flatten as utils_flatten |
|
31 | from IPython.utils.data import flatten as utils_flatten | |
@@ -67,7 +67,7 class Map: | |||||
67 | return |
|
67 | return | |
68 |
|
68 | |||
69 | remainder = len(seq)%q |
|
69 | remainder = len(seq)%q | |
70 | basesize = len(seq)/q |
|
70 | basesize = len(seq)//q | |
71 | hi = [] |
|
71 | hi = [] | |
72 | lo = [] |
|
72 | lo = [] | |
73 | for n in range(q): |
|
73 | for n in range(q): |
@@ -16,6 +16,8 Authors: | |||||
16 | # Imports |
|
16 | # Imports | |
17 | #----------------------------------------------------------------------------- |
|
17 | #----------------------------------------------------------------------------- | |
18 |
|
18 | |||
|
19 | from __future__ import division | |||
|
20 | ||||
19 | import warnings |
|
21 | import warnings | |
20 |
|
22 | |||
21 | from IPython.testing.skipdoctest import skip_doctest |
|
23 | from IPython.testing.skipdoctest import skip_doctest | |
@@ -142,7 +144,7 class ParallelFunction(RemoteFunction): | |||||
142 | balanced = 'Balanced' in self.view.__class__.__name__ |
|
144 | balanced = 'Balanced' in self.view.__class__.__name__ | |
143 | if balanced: |
|
145 | if balanced: | |
144 | if self.chunksize: |
|
146 | if self.chunksize: | |
145 | nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0) |
|
147 | nparts = len_0//self.chunksize + int(len_0%self.chunksize > 0) | |
146 | else: |
|
148 | else: | |
147 | nparts = len_0 |
|
149 | nparts = len_0 | |
148 | targets = [None]*nparts |
|
150 | targets = [None]*nparts | |
@@ -169,7 +171,10 class ParallelFunction(RemoteFunction): | |||||
169 |
|
171 | |||
170 | # print (args) |
|
172 | # print (args) | |
171 | if hasattr(self, '_map'): |
|
173 | if hasattr(self, '_map'): | |
172 | f = map |
|
174 | if sys.version_info[0] >= 3: | |
|
175 | f = lambda f, *sequences: list(map(f, *sequences)) | |||
|
176 | else: | |||
|
177 | f = map | |||
173 | args = [self.func]+args |
|
178 | args = [self.func]+args | |
174 | else: |
|
179 | else: | |
175 | f=self.func |
|
180 | f=self.func |
@@ -969,6 +969,8 class LoadBalancedView(View): | |||||
969 | idents = [] |
|
969 | idents = [] | |
970 | else: |
|
970 | else: | |
971 | idents = self.client._build_targets(targets)[0] |
|
971 | idents = self.client._build_targets(targets)[0] | |
|
972 | # ensure *not* bytes | |||
|
973 | idents = [ ident.decode() for ident in idents ] | |||
972 |
|
974 | |||
973 | after = self._render_dependency(after) |
|
975 | after = self._render_dependency(after) | |
974 | follow = self._render_dependency(follow) |
|
976 | follow = self._render_dependency(follow) |
@@ -25,6 +25,8 from zmq.eventloop import ioloop, zmqstream | |||||
25 | from IPython.config.configurable import LoggingConfigurable |
|
25 | from IPython.config.configurable import LoggingConfigurable | |
26 | from IPython.utils.traitlets import Set, Instance, CFloat |
|
26 | from IPython.utils.traitlets import Set, Instance, CFloat | |
27 |
|
27 | |||
|
28 | from IPython.parallel.util import ensure_bytes | |||
|
29 | ||||
28 | class Heart(object): |
|
30 | class Heart(object): | |
29 | """A basic heart object for responding to a HeartMonitor. |
|
31 | """A basic heart object for responding to a HeartMonitor. | |
30 | This is a simple wrapper with defaults for the most common |
|
32 | This is a simple wrapper with defaults for the most common | |
@@ -42,9 +44,9 class Heart(object): | |||||
42 | self.device.connect_in(in_addr) |
|
44 | self.device.connect_in(in_addr) | |
43 | self.device.connect_out(out_addr) |
|
45 | self.device.connect_out(out_addr) | |
44 | if in_type == zmq.SUB: |
|
46 | if in_type == zmq.SUB: | |
45 | self.device.setsockopt_in(zmq.SUBSCRIBE, "") |
|
47 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") | |
46 | if heart_id is None: |
|
48 | if heart_id is None: | |
47 |
heart_id = |
|
49 | heart_id = ensure_bytes(uuid.uuid4()) | |
48 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) |
|
50 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) | |
49 | self.id = heart_id |
|
51 | self.id = heart_id | |
50 |
|
52 | |||
@@ -115,7 +117,7 class HeartMonitor(LoggingConfigurable): | |||||
115 | self.responses = set() |
|
117 | self.responses = set() | |
116 | # print self.on_probation, self.hearts |
|
118 | # print self.on_probation, self.hearts | |
117 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) |
|
119 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) | |
118 | self.pingstream.send(str(self.lifetime)) |
|
120 | self.pingstream.send(ensure_bytes(str(self.lifetime))) | |
119 |
|
121 | |||
120 | def handle_new_heart(self, heart): |
|
122 | def handle_new_heart(self, heart): | |
121 | if self._new_handlers: |
|
123 | if self._new_handlers: | |
@@ -140,11 +142,13 class HeartMonitor(LoggingConfigurable): | |||||
140 |
|
142 | |||
141 | def handle_pong(self, msg): |
|
143 | def handle_pong(self, msg): | |
142 | "a heart just beat" |
|
144 | "a heart just beat" | |
143 |
|
|
145 | current = ensure_bytes(str(self.lifetime)) | |
|
146 | last = ensure_bytes(str(self.last_ping)) | |||
|
147 | if msg[1] == current: | |||
144 | delta = time.time()-self.tic |
|
148 | delta = time.time()-self.tic | |
145 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) |
|
149 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | |
146 | self.responses.add(msg[0]) |
|
150 | self.responses.add(msg[0]) | |
147 |
elif msg[1] == |
|
151 | elif msg[1] == last: | |
148 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) |
|
152 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) | |
149 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) |
|
153 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) | |
150 | self.responses.add(msg[0]) |
|
154 | self.responses.add(msg[0]) |
@@ -289,7 +289,7 class HubFactory(RegistrationFactory): | |||||
289 | # resubmit stream |
|
289 | # resubmit stream | |
290 | r = ZMQStream(ctx.socket(zmq.XREQ), loop) |
|
290 | r = ZMQStream(ctx.socket(zmq.XREQ), loop) | |
291 | url = util.disambiguate_url(self.client_info['task'][-1]) |
|
291 | url = util.disambiguate_url(self.client_info['task'][-1]) | |
292 | r.setsockopt(zmq.IDENTITY, self.session.session) |
|
292 | r.setsockopt(zmq.IDENTITY, util.ensure_bytes(self.session.session)) | |
293 | r.connect(url) |
|
293 | r.connect(url) | |
294 |
|
294 | |||
295 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
295 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, | |
@@ -380,14 +380,14 class Hub(SessionFactory): | |||||
380 | self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) |
|
380 | self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) | |
381 | self.heartmonitor.add_new_heart_handler(self.handle_new_heart) |
|
381 | self.heartmonitor.add_new_heart_handler(self.handle_new_heart) | |
382 |
|
382 | |||
383 |
self.monitor_handlers = { |
|
383 | self.monitor_handlers = {b'in' : self.save_queue_request, | |
384 | 'out': self.save_queue_result, |
|
384 | b'out': self.save_queue_result, | |
385 | 'intask': self.save_task_request, |
|
385 | b'intask': self.save_task_request, | |
386 | 'outtask': self.save_task_result, |
|
386 | b'outtask': self.save_task_result, | |
387 | 'tracktask': self.save_task_destination, |
|
387 | b'tracktask': self.save_task_destination, | |
388 | 'incontrol': _passer, |
|
388 | b'incontrol': _passer, | |
389 | 'outcontrol': _passer, |
|
389 | b'outcontrol': _passer, | |
390 | 'iopub': self.save_iopub_message, |
|
390 | b'iopub': self.save_iopub_message, | |
391 | } |
|
391 | } | |
392 |
|
392 | |||
393 | self.query_handlers = {'queue_request': self.queue_status, |
|
393 | self.query_handlers = {'queue_request': self.queue_status, | |
@@ -562,8 +562,9 class Hub(SessionFactory): | |||||
562 | return |
|
562 | return | |
563 | record = init_record(msg) |
|
563 | record = init_record(msg) | |
564 | msg_id = record['msg_id'] |
|
564 | msg_id = record['msg_id'] | |
565 | record['engine_uuid'] = queue_id |
|
565 | # Unicode in records | |
566 | record['client_uuid'] = client_id |
|
566 | record['engine_uuid'] = queue_id.decode('utf8', 'replace') | |
|
567 | record['client_uuid'] = client_id.decode('utf8', 'replace') | |||
567 | record['queue'] = 'mux' |
|
568 | record['queue'] = 'mux' | |
568 |
|
569 | |||
569 | try: |
|
570 | try: | |
@@ -751,7 +752,7 class Hub(SessionFactory): | |||||
751 | # print (content) |
|
752 | # print (content) | |
752 | msg_id = content['msg_id'] |
|
753 | msg_id = content['msg_id'] | |
753 | engine_uuid = content['engine_id'] |
|
754 | engine_uuid = content['engine_id'] | |
754 | eid = self.by_ident[engine_uuid] |
|
755 | eid = self.by_ident[util.ensure_bytes(engine_uuid)] | |
755 |
|
756 | |||
756 | self.log.info("task::task %r arrived on %r"%(msg_id, eid)) |
|
757 | self.log.info("task::task %r arrived on %r"%(msg_id, eid)) | |
757 | if msg_id in self.unassigned: |
|
758 | if msg_id in self.unassigned: | |
@@ -833,7 +834,7 class Hub(SessionFactory): | |||||
833 | jsonable = {} |
|
834 | jsonable = {} | |
834 | for k,v in self.keytable.iteritems(): |
|
835 | for k,v in self.keytable.iteritems(): | |
835 | if v not in self.dead_engines: |
|
836 | if v not in self.dead_engines: | |
836 | jsonable[str(k)] = v |
|
837 | jsonable[str(k)] = v.decode() | |
837 | content['engines'] = jsonable |
|
838 | content['engines'] = jsonable | |
838 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) |
|
839 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) | |
839 |
|
840 | |||
@@ -841,11 +842,13 class Hub(SessionFactory): | |||||
841 | """Register a new engine.""" |
|
842 | """Register a new engine.""" | |
842 | content = msg['content'] |
|
843 | content = msg['content'] | |
843 | try: |
|
844 | try: | |
844 | queue = content['queue'] |
|
845 | queue = util.ensure_bytes(content['queue']) | |
845 | except KeyError: |
|
846 | except KeyError: | |
846 | self.log.error("registration::queue not specified", exc_info=True) |
|
847 | self.log.error("registration::queue not specified", exc_info=True) | |
847 | return |
|
848 | return | |
848 | heart = content.get('heartbeat', None) |
|
849 | heart = content.get('heartbeat', None) | |
|
850 | if heart: | |||
|
851 | heart = util.ensure_bytes(heart) | |||
849 | """register a new engine, and create the socket(s) necessary""" |
|
852 | """register a new engine, and create the socket(s) necessary""" | |
850 | eid = self._next_id |
|
853 | eid = self._next_id | |
851 | # print (eid, queue, reg, heart) |
|
854 | # print (eid, queue, reg, heart) | |
@@ -912,7 +915,7 class Hub(SessionFactory): | |||||
912 | self.log.info("registration::unregister_engine(%r)"%eid) |
|
915 | self.log.info("registration::unregister_engine(%r)"%eid) | |
913 | # print (eid) |
|
916 | # print (eid) | |
914 | uuid = self.keytable[eid] |
|
917 | uuid = self.keytable[eid] | |
915 | content=dict(id=eid, queue=uuid) |
|
918 | content=dict(id=eid, queue=uuid.decode()) | |
916 | self.dead_engines.add(uuid) |
|
919 | self.dead_engines.add(uuid) | |
917 | # self.ids.remove(eid) |
|
920 | # self.ids.remove(eid) | |
918 | # uuid = self.keytable.pop(eid) |
|
921 | # uuid = self.keytable.pop(eid) | |
@@ -980,7 +983,7 class Hub(SessionFactory): | |||||
980 | self.tasks[eid] = list() |
|
983 | self.tasks[eid] = list() | |
981 | self.completed[eid] = list() |
|
984 | self.completed[eid] = list() | |
982 | self.hearts[heart] = eid |
|
985 | self.hearts[heart] = eid | |
983 | content = dict(id=eid, queue=self.engines[eid].queue) |
|
986 | content = dict(id=eid, queue=self.engines[eid].queue.decode()) | |
984 | if self.notifier: |
|
987 | if self.notifier: | |
985 | self.session.send(self.notifier, "registration_notification", content=content) |
|
988 | self.session.send(self.notifier, "registration_notification", content=content) | |
986 | self.log.info("engine::Engine Connected: %i"%eid) |
|
989 | self.log.info("engine::Engine Connected: %i"%eid) | |
@@ -1054,9 +1057,9 class Hub(SessionFactory): | |||||
1054 | queue = len(queue) |
|
1057 | queue = len(queue) | |
1055 | completed = len(completed) |
|
1058 | completed = len(completed) | |
1056 | tasks = len(tasks) |
|
1059 | tasks = len(tasks) | |
1057 |
content[ |
|
1060 | content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} | |
1058 | content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned) |
|
1061 | content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned) | |
1059 |
|
1062 | # print (content) | ||
1060 | self.session.send(self.query, "queue_reply", content=content, ident=client_id) |
|
1063 | self.session.send(self.query, "queue_reply", content=content, ident=client_id) | |
1061 |
|
1064 | |||
1062 | def purge_results(self, client_id, msg): |
|
1065 | def purge_results(self, client_id, msg): | |
@@ -1179,7 +1182,7 class Hub(SessionFactory): | |||||
1179 | 'io' : io_dict, |
|
1182 | 'io' : io_dict, | |
1180 | } |
|
1183 | } | |
1181 | if rec['result_buffers']: |
|
1184 | if rec['result_buffers']: | |
1182 |
buffers = map( |
|
1185 | buffers = map(bytes, rec['result_buffers']) | |
1183 | else: |
|
1186 | else: | |
1184 | buffers = [] |
|
1187 | buffers = [] | |
1185 |
|
1188 | |||
@@ -1281,7 +1284,7 class Hub(SessionFactory): | |||||
1281 | buffers.extend(rb) |
|
1284 | buffers.extend(rb) | |
1282 | content = dict(status='ok', records=records, buffer_lens=buffer_lens, |
|
1285 | content = dict(status='ok', records=records, buffer_lens=buffer_lens, | |
1283 | result_buffer_lens=result_buffer_lens) |
|
1286 | result_buffer_lens=result_buffer_lens) | |
1284 |
|
1287 | # self.log.debug (content) | ||
1285 | self.session.send(self.query, "db_reply", content=content, |
|
1288 | self.session.send(self.query, "db_reply", content=content, | |
1286 | parent=msg, ident=client_id, |
|
1289 | parent=msg, ident=client_id, | |
1287 | buffers=buffers) |
|
1290 | buffers=buffers) |
@@ -40,11 +40,11 from zmq.eventloop import ioloop, zmqstream | |||||
40 | from IPython.external.decorator import decorator |
|
40 | from IPython.external.decorator import decorator | |
41 | from IPython.config.application import Application |
|
41 | from IPython.config.application import Application | |
42 | from IPython.config.loader import Config |
|
42 | from IPython.config.loader import Config | |
43 | from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum |
|
43 | from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes | |
44 |
|
44 | |||
45 | from IPython.parallel import error |
|
45 | from IPython.parallel import error | |
46 | from IPython.parallel.factory import SessionFactory |
|
46 | from IPython.parallel.factory import SessionFactory | |
47 | from IPython.parallel.util import connect_logger, local_logger |
|
47 | from IPython.parallel.util import connect_logger, local_logger, ensure_bytes | |
48 |
|
48 | |||
49 | from .dependency import Dependency |
|
49 | from .dependency import Dependency | |
50 |
|
50 | |||
@@ -174,6 +174,10 class TaskScheduler(SessionFactory): | |||||
174 | blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency |
|
174 | blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency | |
175 | auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') |
|
175 | auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') | |
176 |
|
176 | |||
|
177 | ident = CBytes() # ZMQ identity. This should just be self.session.session | |||
|
178 | # but ensure Bytes | |||
|
179 | def _ident_default(self): | |||
|
180 | return ensure_bytes(self.session.session) | |||
177 |
|
181 | |||
178 | def start(self): |
|
182 | def start(self): | |
179 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
183 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | |
@@ -204,7 +208,7 class TaskScheduler(SessionFactory): | |||||
204 | try: |
|
208 | try: | |
205 | idents,msg = self.session.feed_identities(msg) |
|
209 | idents,msg = self.session.feed_identities(msg) | |
206 | except ValueError: |
|
210 | except ValueError: | |
207 |
self.log.warn("task::Invalid Message: %r" |
|
211 | self.log.warn("task::Invalid Message: %r",msg) | |
208 | return |
|
212 | return | |
209 | try: |
|
213 | try: | |
210 | msg = self.session.unpack_message(msg) |
|
214 | msg = self.session.unpack_message(msg) | |
@@ -219,15 +223,16 class TaskScheduler(SessionFactory): | |||||
219 | self.log.error("Unhandled message type: %r"%msg_type) |
|
223 | self.log.error("Unhandled message type: %r"%msg_type) | |
220 | else: |
|
224 | else: | |
221 | try: |
|
225 | try: | |
222 |
handler( |
|
226 | handler(ensure_bytes(msg['content']['queue'])) | |
223 |
except |
|
227 | except Exception: | |
224 |
self.log.error("task::Invalid notification msg: %r" |
|
228 | self.log.error("task::Invalid notification msg: %r",msg) | |
225 |
|
229 | |||
226 | def _register_engine(self, uid): |
|
230 | def _register_engine(self, uid): | |
227 | """New engine with ident `uid` became available.""" |
|
231 | """New engine with ident `uid` became available.""" | |
228 | # head of the line: |
|
232 | # head of the line: | |
229 | self.targets.insert(0,uid) |
|
233 | self.targets.insert(0,uid) | |
230 | self.loads.insert(0,0) |
|
234 | self.loads.insert(0,0) | |
|
235 | ||||
231 | # initialize sets |
|
236 | # initialize sets | |
232 | self.completed[uid] = set() |
|
237 | self.completed[uid] = set() | |
233 | self.failed[uid] = set() |
|
238 | self.failed[uid] = set() | |
@@ -309,14 +314,18 class TaskScheduler(SessionFactory): | |||||
309 |
|
314 | |||
310 |
|
315 | |||
311 | # send to monitor |
|
316 | # send to monitor | |
312 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) |
|
317 | self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False) | |
313 |
|
318 | |||
314 | header = msg['header'] |
|
319 | header = msg['header'] | |
315 | msg_id = header['msg_id'] |
|
320 | msg_id = header['msg_id'] | |
316 | self.all_ids.add(msg_id) |
|
321 | self.all_ids.add(msg_id) | |
317 |
|
322 | |||
318 | # targets |
|
323 | # get targets as a set of bytes objects | |
319 | targets = set(header.get('targets', [])) |
|
324 | # from a list of unicode objects | |
|
325 | targets = header.get('targets', []) | |||
|
326 | targets = map(ensure_bytes, targets) | |||
|
327 | targets = set(targets) | |||
|
328 | ||||
320 | retries = header.get('retries', 0) |
|
329 | retries = header.get('retries', 0) | |
321 | self.retries[msg_id] = retries |
|
330 | self.retries[msg_id] = retries | |
322 |
|
331 | |||
@@ -412,7 +421,7 class TaskScheduler(SessionFactory): | |||||
412 |
|
421 | |||
413 | msg = self.session.send(self.client_stream, 'apply_reply', content, |
|
422 | msg = self.session.send(self.client_stream, 'apply_reply', content, | |
414 | parent=header, ident=idents) |
|
423 | parent=header, ident=idents) | |
415 | self.session.send(self.mon_stream, msg, ident=['outtask']+idents) |
|
424 | self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents) | |
416 |
|
425 | |||
417 | self.update_graph(msg_id, success=False) |
|
426 | self.update_graph(msg_id, success=False) | |
418 |
|
427 | |||
@@ -494,9 +503,9 class TaskScheduler(SessionFactory): | |||||
494 | self.add_job(idx) |
|
503 | self.add_job(idx) | |
495 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) |
|
504 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) | |
496 | # notify Hub |
|
505 | # notify Hub | |
497 | content = dict(msg_id=msg_id, engine_id=target) |
|
506 | content = dict(msg_id=msg_id, engine_id=target.decode()) | |
498 | self.session.send(self.mon_stream, 'task_destination', content=content, |
|
507 | self.session.send(self.mon_stream, 'task_destination', content=content, | |
499 |
ident=['tracktask',self. |
|
508 | ident=[b'tracktask',self.ident]) | |
500 |
|
509 | |||
501 |
|
510 | |||
502 | #----------------------------------------------------------------------- |
|
511 | #----------------------------------------------------------------------- | |
@@ -533,7 +542,7 class TaskScheduler(SessionFactory): | |||||
533 | # relay to client and update graph |
|
542 | # relay to client and update graph | |
534 | self.handle_result(idents, parent, raw_msg, success) |
|
543 | self.handle_result(idents, parent, raw_msg, success) | |
535 | # send to Hub monitor |
|
544 | # send to Hub monitor | |
536 | self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False) |
|
545 | self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False) | |
537 | else: |
|
546 | else: | |
538 | self.handle_unmet_dependency(idents, parent) |
|
547 | self.handle_unmet_dependency(idents, parent) | |
539 |
|
548 |
@@ -28,6 +28,12 from IPython.utils.jsonutil import date_default, extract_dates, squash_dates | |||||
28 | # SQLite operators, adapters, and converters |
|
28 | # SQLite operators, adapters, and converters | |
29 | #----------------------------------------------------------------------------- |
|
29 | #----------------------------------------------------------------------------- | |
30 |
|
30 | |||
|
31 | try: | |||
|
32 | buffer | |||
|
33 | except NameError: | |||
|
34 | # py3k | |||
|
35 | buffer = memoryview | |||
|
36 | ||||
31 | operators = { |
|
37 | operators = { | |
32 | '$lt' : "<", |
|
38 | '$lt' : "<", | |
33 | '$gt' : ">", |
|
39 | '$gt' : ">", | |
@@ -54,7 +60,11 def _convert_dict(ds): | |||||
54 | if ds is None: |
|
60 | if ds is None: | |
55 | return ds |
|
61 | return ds | |
56 | else: |
|
62 | else: | |
57 | return extract_dates(json.loads(ds)) |
|
63 | if isinstance(ds, bytes): | |
|
64 | # If I understand the sqlite doc correctly, this will always be utf8 | |||
|
65 | ds = ds.decode('utf8') | |||
|
66 | d = json.loads(ds) | |||
|
67 | return extract_dates(d) | |||
58 |
|
68 | |||
59 | def _adapt_bufs(bufs): |
|
69 | def _adapt_bufs(bufs): | |
60 | # this is *horrible* |
|
70 | # this is *horrible* |
@@ -28,7 +28,7 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode | |||||
28 |
|
28 | |||
29 | from IPython.parallel.controller.heartmonitor import Heart |
|
29 | from IPython.parallel.controller.heartmonitor import Heart | |
30 | from IPython.parallel.factory import RegistrationFactory |
|
30 | from IPython.parallel.factory import RegistrationFactory | |
31 | from IPython.parallel.util import disambiguate_url |
|
31 | from IPython.parallel.util import disambiguate_url, ensure_bytes | |
32 |
|
32 | |||
33 | from IPython.zmq.session import Message |
|
33 | from IPython.zmq.session import Message | |
34 |
|
34 | |||
@@ -65,7 +65,7 class EngineFactory(RegistrationFactory): | |||||
65 | ctx = self.context |
|
65 | ctx = self.context | |
66 |
|
66 | |||
67 | reg = ctx.socket(zmq.XREQ) |
|
67 | reg = ctx.socket(zmq.XREQ) | |
68 | reg.setsockopt(zmq.IDENTITY, self.ident) |
|
68 | reg.setsockopt(zmq.IDENTITY, ensure_bytes(self.ident)) | |
69 | reg.connect(self.url) |
|
69 | reg.connect(self.url) | |
70 | self.registrar = zmqstream.ZMQStream(reg, self.loop) |
|
70 | self.registrar = zmqstream.ZMQStream(reg, self.loop) | |
71 |
|
71 | |||
@@ -83,7 +83,7 class EngineFactory(RegistrationFactory): | |||||
83 | self._abort_dc.stop() |
|
83 | self._abort_dc.stop() | |
84 | ctx = self.context |
|
84 | ctx = self.context | |
85 | loop = self.loop |
|
85 | loop = self.loop | |
86 | identity = self.ident |
|
86 | identity = ensure_bytes(self.ident) | |
87 |
|
87 | |||
88 | idents,msg = self.session.feed_identities(msg) |
|
88 | idents,msg = self.session.feed_identities(msg) | |
89 | msg = Message(self.session.unpack_message(msg)) |
|
89 | msg = Message(self.session.unpack_message(msg)) |
@@ -35,12 +35,12 import zmq | |||||
35 | from zmq.eventloop import ioloop, zmqstream |
|
35 | from zmq.eventloop import ioloop, zmqstream | |
36 |
|
36 | |||
37 | # Local imports. |
|
37 | # Local imports. | |
38 | from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode |
|
38 | from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes | |
39 | from IPython.zmq.completer import KernelCompleter |
|
39 | from IPython.zmq.completer import KernelCompleter | |
40 |
|
40 | |||
41 | from IPython.parallel.error import wrap_exception |
|
41 | from IPython.parallel.error import wrap_exception | |
42 | from IPython.parallel.factory import SessionFactory |
|
42 | from IPython.parallel.factory import SessionFactory | |
43 | from IPython.parallel.util import serialize_object, unpack_apply_message |
|
43 | from IPython.parallel.util import serialize_object, unpack_apply_message, ensure_bytes | |
44 |
|
44 | |||
45 | def printer(*args): |
|
45 | def printer(*args): | |
46 | pprint(args, stream=sys.__stdout__) |
|
46 | pprint(args, stream=sys.__stdout__) | |
@@ -73,8 +73,14 class Kernel(SessionFactory): | |||||
73 | # kwargs: |
|
73 | # kwargs: | |
74 | exec_lines = List(Unicode, config=True, |
|
74 | exec_lines = List(Unicode, config=True, | |
75 | help="List of lines to execute") |
|
75 | help="List of lines to execute") | |
76 |
|
76 | |||
|
77 | # identities: | |||
77 | int_id = Int(-1) |
|
78 | int_id = Int(-1) | |
|
79 | bident = CBytes() | |||
|
80 | ident = Unicode() | |||
|
81 | def _ident_changed(self, name, old, new): | |||
|
82 | self.bident = ensure_bytes(new) | |||
|
83 | ||||
78 | user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""") |
|
84 | user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""") | |
79 |
|
85 | |||
80 | control_stream = Instance(zmqstream.ZMQStream) |
|
86 | control_stream = Instance(zmqstream.ZMQStream) | |
@@ -193,6 +199,8 class Kernel(SessionFactory): | |||||
193 | except: |
|
199 | except: | |
194 | self.log.error("Invalid Message", exc_info=True) |
|
200 | self.log.error("Invalid Message", exc_info=True) | |
195 | return |
|
201 | return | |
|
202 | else: | |||
|
203 | self.log.debug("Control received, %s", msg) | |||
196 |
|
204 | |||
197 | header = msg['header'] |
|
205 | header = msg['header'] | |
198 | msg_id = header['msg_id'] |
|
206 | msg_id = header['msg_id'] | |
@@ -247,7 +255,7 class Kernel(SessionFactory): | |||||
247 | self.log.error("Got bad msg: %s"%parent, exc_info=True) |
|
255 | self.log.error("Got bad msg: %s"%parent, exc_info=True) | |
248 | return |
|
256 | return | |
249 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, |
|
257 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, | |
250 | ident='%s.pyin'%self.prefix) |
|
258 | ident=ensure_bytes('%s.pyin'%self.prefix)) | |
251 | started = datetime.now() |
|
259 | started = datetime.now() | |
252 | try: |
|
260 | try: | |
253 | comp_code = self.compiler(code, '<zmq-kernel>') |
|
261 | comp_code = self.compiler(code, '<zmq-kernel>') | |
@@ -261,7 +269,7 class Kernel(SessionFactory): | |||||
261 | exc_content = self._wrap_exception('execute') |
|
269 | exc_content = self._wrap_exception('execute') | |
262 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
270 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
263 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, |
|
271 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | |
264 | ident='%s.pyerr'%self.prefix) |
|
272 | ident=ensure_bytes('%s.pyerr'%self.prefix)) | |
265 | reply_content = exc_content |
|
273 | reply_content = exc_content | |
266 | else: |
|
274 | else: | |
267 | reply_content = {'status' : 'ok'} |
|
275 | reply_content = {'status' : 'ok'} | |
@@ -285,7 +293,6 class Kernel(SessionFactory): | |||||
285 | def apply_request(self, stream, ident, parent): |
|
293 | def apply_request(self, stream, ident, parent): | |
286 | # flush previous reply, so this request won't block it |
|
294 | # flush previous reply, so this request won't block it | |
287 | stream.flush(zmq.POLLOUT) |
|
295 | stream.flush(zmq.POLLOUT) | |
288 |
|
||||
289 | try: |
|
296 | try: | |
290 | content = parent[u'content'] |
|
297 | content = parent[u'content'] | |
291 | bufs = parent[u'buffers'] |
|
298 | bufs = parent[u'buffers'] | |
@@ -341,7 +348,7 class Kernel(SessionFactory): | |||||
341 | exc_content = self._wrap_exception('apply') |
|
348 | exc_content = self._wrap_exception('apply') | |
342 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
349 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
343 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, |
|
350 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | |
344 | ident='%s.pyerr'%self.prefix) |
|
351 | ident=ensure_bytes('%s.pyerr'%self.prefix)) | |
345 | reply_content = exc_content |
|
352 | reply_content = exc_content | |
346 | result_buf = [] |
|
353 | result_buf = [] | |
347 |
|
354 | |||
@@ -370,6 +377,8 class Kernel(SessionFactory): | |||||
370 | except: |
|
377 | except: | |
371 | self.log.error("Invalid Message", exc_info=True) |
|
378 | self.log.error("Invalid Message", exc_info=True) | |
372 | return |
|
379 | return | |
|
380 | else: | |||
|
381 | self.log.debug("Message received, %s", msg) | |||
373 |
|
382 | |||
374 |
|
383 | |||
375 | header = msg['header'] |
|
384 | header = msg['header'] |
@@ -41,9 +41,11 def crash(): | |||||
41 | if sys.platform.startswith('win'): |
|
41 | if sys.platform.startswith('win'): | |
42 | import ctypes |
|
42 | import ctypes | |
43 | ctypes.windll.kernel32.SetErrorMode(0x0002); |
|
43 | ctypes.windll.kernel32.SetErrorMode(0x0002); | |
44 |
|
44 | args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b''] | ||
45 | co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00', |
|
45 | if sys.version_info[0] >= 3: | |
46 | (), (), (), '', '', 1, b'') |
|
46 | args.insert(1, 0) | |
|
47 | ||||
|
48 | co = types.CodeType(*args) | |||
47 | exec(co) |
|
49 | exec(co) | |
48 |
|
50 | |||
49 | def wait(n): |
|
51 | def wait(n): |
@@ -57,7 +57,7 class AsyncResultTest(ClusterTestCase): | |||||
57 |
|
57 | |||
58 | def test_get_after_error(self): |
|
58 | def test_get_after_error(self): | |
59 | ar = self.client[-1].apply_async(lambda : 1/0) |
|
59 | ar = self.client[-1].apply_async(lambda : 1/0) | |
60 | ar.wait() |
|
60 | ar.wait(10) | |
61 | self.assertRaisesRemote(ZeroDivisionError, ar.get) |
|
61 | self.assertRaisesRemote(ZeroDivisionError, ar.get) | |
62 | self.assertRaisesRemote(ZeroDivisionError, ar.get) |
|
62 | self.assertRaisesRemote(ZeroDivisionError, ar.get) | |
63 | self.assertRaisesRemote(ZeroDivisionError, ar.get_dict) |
|
63 | self.assertRaisesRemote(ZeroDivisionError, ar.get_dict) |
@@ -16,6 +16,8 Authors: | |||||
16 | # Imports |
|
16 | # Imports | |
17 | #------------------------------------------------------------------------------- |
|
17 | #------------------------------------------------------------------------------- | |
18 |
|
18 | |||
|
19 | from __future__ import division | |||
|
20 | ||||
19 | import time |
|
21 | import time | |
20 | from datetime import datetime |
|
22 | from datetime import datetime | |
21 | from tempfile import mktemp |
|
23 | from tempfile import mktemp | |
@@ -132,7 +134,9 class TestClient(ClusterTestCase): | |||||
132 | self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) |
|
134 | self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) | |
133 | allqs = self.client.queue_status() |
|
135 | allqs = self.client.queue_status() | |
134 | self.assertTrue(isinstance(allqs, dict)) |
|
136 | self.assertTrue(isinstance(allqs, dict)) | |
135 | self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned'])) |
|
137 | intkeys = list(allqs.keys()) | |
|
138 | intkeys.remove('unassigned') | |||
|
139 | self.assertEquals(sorted(intkeys), sorted(self.client.ids)) | |||
136 | unassigned = allqs.pop('unassigned') |
|
140 | unassigned = allqs.pop('unassigned') | |
137 | for eid,qs in allqs.items(): |
|
141 | for eid,qs in allqs.items(): | |
138 | self.assertTrue(isinstance(qs, dict)) |
|
142 | self.assertTrue(isinstance(qs, dict)) | |
@@ -156,7 +160,7 class TestClient(ClusterTestCase): | |||||
156 | def test_db_query_dt(self): |
|
160 | def test_db_query_dt(self): | |
157 | """test db query by date""" |
|
161 | """test db query by date""" | |
158 | hist = self.client.hub_history() |
|
162 | hist = self.client.hub_history() | |
159 | middle = self.client.db_query({'msg_id' : hist[len(hist)/2]})[0] |
|
163 | middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0] | |
160 | tic = middle['submitted'] |
|
164 | tic = middle['submitted'] | |
161 | before = self.client.db_query({'submitted' : {'$lt' : tic}}) |
|
165 | before = self.client.db_query({'submitted' : {'$lt' : tic}}) | |
162 | after = self.client.db_query({'submitted' : {'$gte' : tic}}) |
|
166 | after = self.client.db_query({'submitted' : {'$gte' : tic}}) |
@@ -16,6 +16,7 Authors: | |||||
16 | # Imports |
|
16 | # Imports | |
17 | #------------------------------------------------------------------------------- |
|
17 | #------------------------------------------------------------------------------- | |
18 |
|
18 | |||
|
19 | from __future__ import division | |||
19 |
|
20 | |||
20 | import tempfile |
|
21 | import tempfile | |
21 | import time |
|
22 | import time | |
@@ -100,7 +101,7 class TestDictBackend(TestCase): | |||||
100 | def test_find_records_dt(self): |
|
101 | def test_find_records_dt(self): | |
101 | """test finding records by date""" |
|
102 | """test finding records by date""" | |
102 | hist = self.db.get_history() |
|
103 | hist = self.db.get_history() | |
103 | middle = self.db.get_record(hist[len(hist)/2]) |
|
104 | middle = self.db.get_record(hist[len(hist)//2]) | |
104 | tic = middle['submitted'] |
|
105 | tic = middle['submitted'] | |
105 | before = self.db.find_records({'submitted' : {'$lt' : tic}}) |
|
106 | before = self.db.find_records({'submitted' : {'$lt' : tic}}) | |
106 | after = self.db.find_records({'submitted' : {'$gte' : tic}}) |
|
107 | after = self.db.find_records({'submitted' : {'$gte' : tic}}) | |
@@ -168,7 +169,7 class TestDictBackend(TestCase): | |||||
168 | query = {'msg_id' : {'$in':msg_ids}} |
|
169 | query = {'msg_id' : {'$in':msg_ids}} | |
169 | self.db.drop_matching_records(query) |
|
170 | self.db.drop_matching_records(query) | |
170 | recs = self.db.find_records(query) |
|
171 | recs = self.db.find_records(query) | |
171 |
self.assert |
|
172 | self.assertEquals(len(recs), 0) | |
172 |
|
173 | |||
173 | class TestSQLiteBackend(TestDictBackend): |
|
174 | class TestSQLiteBackend(TestDictBackend): | |
174 | def create_db(self): |
|
175 | def create_db(self): |
@@ -43,7 +43,7 class TestView(ClusterTestCase): | |||||
43 | # self.add_engines(1) |
|
43 | # self.add_engines(1) | |
44 | eid = self.client.ids[-1] |
|
44 | eid = self.client.ids[-1] | |
45 | ar = self.client[eid].apply_async(crash) |
|
45 | ar = self.client[eid].apply_async(crash) | |
46 | self.assertRaisesRemote(error.EngineError, ar.get) |
|
46 | self.assertRaisesRemote(error.EngineError, ar.get, 10) | |
47 | eid = ar.engine_id |
|
47 | eid = ar.engine_id | |
48 | tic = time.time() |
|
48 | tic = time.time() | |
49 | while eid in self.client.ids and time.time()-tic < 5: |
|
49 | while eid in self.client.ids and time.time()-tic < 5: | |
@@ -413,7 +413,10 class TestView(ClusterTestCase): | |||||
413 | """test executing unicode strings""" |
|
413 | """test executing unicode strings""" | |
414 | v = self.client[-1] |
|
414 | v = self.client[-1] | |
415 | v.block=True |
|
415 | v.block=True | |
416 | code=u"a=u'é'" |
|
416 | if sys.version_info[0] >= 3: | |
|
417 | code="a='é'" | |||
|
418 | else: | |||
|
419 | code=u"a=u'é'" | |||
417 | v.execute(code) |
|
420 | v.execute(code) | |
418 | self.assertEquals(v['a'], u'é') |
|
421 | self.assertEquals(v['a'], u'é') | |
419 |
|
422 | |||
@@ -433,7 +436,7 class TestView(ClusterTestCase): | |||||
433 | assert isinstance(check, bytes), "%r is not bytes"%check |
|
436 | assert isinstance(check, bytes), "%r is not bytes"%check | |
434 | assert a.encode('utf8') == check, "%s != %s"%(a,check) |
|
437 | assert a.encode('utf8') == check, "%s != %s"%(a,check) | |
435 |
|
438 | |||
436 |
for s in [ u'é', u'ßø®∫','asdf' |
|
439 | for s in [ u'é', u'ßø®∫',u'asdf' ]: | |
437 | try: |
|
440 | try: | |
438 | v.apply_sync(check_unicode, s, s.encode('utf8')) |
|
441 | v.apply_sync(check_unicode, s, s.encode('utf8')) | |
439 | except error.RemoteError as e: |
|
442 | except error.RemoteError as e: |
@@ -101,6 +101,12 class ReverseDict(dict): | |||||
101 | # Functions |
|
101 | # Functions | |
102 | #----------------------------------------------------------------------------- |
|
102 | #----------------------------------------------------------------------------- | |
103 |
|
103 | |||
|
104 | def ensure_bytes(s): | |||
|
105 | """ensure that an object is bytes""" | |||
|
106 | if isinstance(s, unicode): | |||
|
107 | s = s.encode(sys.getdefaultencoding(), 'replace') | |||
|
108 | return s | |||
|
109 | ||||
104 | def validate_url(url): |
|
110 | def validate_url(url): | |
105 | """validate a url for zeromq""" |
|
111 | """validate a url for zeromq""" | |
106 | if not isinstance(url, basestring): |
|
112 | if not isinstance(url, basestring): |
@@ -23,6 +23,7 __docformat__ = "restructuredtext en" | |||||
23 | # Imports |
|
23 | # Imports | |
24 | #------------------------------------------------------------------------------- |
|
24 | #------------------------------------------------------------------------------- | |
25 |
|
25 | |||
|
26 | import sys | |||
26 | import new, types, copy_reg |
|
27 | import new, types, copy_reg | |
27 |
|
28 | |||
28 | def code_ctor(*args): |
|
29 | def code_ctor(*args): | |
@@ -31,9 +32,12 def code_ctor(*args): | |||||
31 | def reduce_code(co): |
|
32 | def reduce_code(co): | |
32 | if co.co_freevars or co.co_cellvars: |
|
33 | if co.co_freevars or co.co_cellvars: | |
33 | raise ValueError("Sorry, cannot pickle code objects with closures") |
|
34 | raise ValueError("Sorry, cannot pickle code objects with closures") | |
34 |
|
|
35 | args = [co.co_argcount, co.co_nlocals, co.co_stacksize, | |
35 | co.co_flags, co.co_code, co.co_consts, co.co_names, |
|
36 | co.co_flags, co.co_code, co.co_consts, co.co_names, | |
36 | co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, |
|
37 | co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, | |
37 |
co.co_lnotab |
|
38 | co.co_lnotab] | |
|
39 | if sys.version_info[0] >= 3: | |||
|
40 | args.insert(1, co.co_kwonlyargcount) | |||
|
41 | return code_ctor, tuple(args) | |||
38 |
|
42 | |||
39 | copy_reg.pickle(types.CodeType, reduce_code) No newline at end of file |
|
43 | copy_reg.pickle(types.CodeType, reduce_code) |
@@ -19,16 +19,23 __test__ = {} | |||||
19 | # Imports |
|
19 | # Imports | |
20 | #------------------------------------------------------------------------------- |
|
20 | #------------------------------------------------------------------------------- | |
21 |
|
21 | |||
|
22 | import sys | |||
22 | import cPickle as pickle |
|
23 | import cPickle as pickle | |
23 |
|
24 | |||
24 | try: |
|
25 | try: | |
25 | import numpy |
|
26 | import numpy | |
26 | except ImportError: |
|
27 | except ImportError: | |
27 | pass |
|
28 | numpy = None | |
28 |
|
29 | |||
29 | class SerializationError(Exception): |
|
30 | class SerializationError(Exception): | |
30 | pass |
|
31 | pass | |
31 |
|
32 | |||
|
33 | if sys.version_info[0] >= 3: | |||
|
34 | buffer = memoryview | |||
|
35 | py3k = True | |||
|
36 | else: | |||
|
37 | py3k = False | |||
|
38 | ||||
32 | #----------------------------------------------------------------------------- |
|
39 | #----------------------------------------------------------------------------- | |
33 | # Classes and functions |
|
40 | # Classes and functions | |
34 | #----------------------------------------------------------------------------- |
|
41 | #----------------------------------------------------------------------------- | |
@@ -93,8 +100,11 class SerializeIt(object): | |||||
93 | def __init__(self, unSerialized): |
|
100 | def __init__(self, unSerialized): | |
94 | self.data = None |
|
101 | self.data = None | |
95 | self.obj = unSerialized.getObject() |
|
102 | self.obj = unSerialized.getObject() | |
96 |
if |
|
103 | if numpy is not None and isinstance(self.obj, numpy.ndarray): | |
97 | if len(self.obj.shape) == 0: # length 0 arrays are just pickled |
|
104 | if py3k or len(self.obj.shape) == 0: # length 0 arrays are just pickled | |
|
105 | # FIXME: | |||
|
106 | # also use pickle for numpy arrays on py3k, since | |||
|
107 | # pyzmq doesn't rebuild from memoryviews properly | |||
98 | self.typeDescriptor = 'pickle' |
|
108 | self.typeDescriptor = 'pickle' | |
99 | self.metadata = {} |
|
109 | self.metadata = {} | |
100 | else: |
|
110 | else: | |
@@ -102,7 +112,7 class SerializeIt(object): | |||||
102 | self.typeDescriptor = 'ndarray' |
|
112 | self.typeDescriptor = 'ndarray' | |
103 | self.metadata = {'shape':self.obj.shape, |
|
113 | self.metadata = {'shape':self.obj.shape, | |
104 | 'dtype':self.obj.dtype.str} |
|
114 | 'dtype':self.obj.dtype.str} | |
105 |
elif isinstance(self.obj, |
|
115 | elif isinstance(self.obj, bytes): | |
106 | self.typeDescriptor = 'bytes' |
|
116 | self.typeDescriptor = 'bytes' | |
107 | self.metadata = {} |
|
117 | self.metadata = {} | |
108 | elif isinstance(self.obj, buffer): |
|
118 | elif isinstance(self.obj, buffer): | |
@@ -146,9 +156,9 class UnSerializeIt(UnSerialized): | |||||
146 |
|
156 | |||
147 | def getObject(self): |
|
157 | def getObject(self): | |
148 | typeDescriptor = self.serialized.getTypeDescriptor() |
|
158 | typeDescriptor = self.serialized.getTypeDescriptor() | |
149 |
if |
|
159 | if numpy is not None and typeDescriptor == 'ndarray': | |
150 | buf = self.serialized.getData() |
|
160 | buf = self.serialized.getData() | |
151 |
if isinstance(buf, ( |
|
161 | if isinstance(buf, (bytes, buffer)): | |
152 | result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype']) |
|
162 | result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype']) | |
153 | else: |
|
163 | else: | |
154 | # memoryview |
|
164 | # memoryview |
General Comments 0
You need to be logged in to leave comments.
Login now