Show More
@@ -0,0 +1,18 b'' | |||||
|
1 | """The IPython ZMQ-based parallel computing interface.""" | |||
|
2 | #----------------------------------------------------------------------------- | |||
|
3 | # Copyright (C) 2011 The IPython Development Team | |||
|
4 | # | |||
|
5 | # Distributed under the terms of the BSD License. The full license is in | |||
|
6 | # the file COPYING, distributed as part of this software. | |||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | ||||
|
9 | #----------------------------------------------------------------------------- | |||
|
10 | # Imports | |||
|
11 | #----------------------------------------------------------------------------- | |||
|
12 | ||||
|
13 | from .asyncresult import * | |||
|
14 | from .client import Client | |||
|
15 | from .dependency import * | |||
|
16 | from .remotefunction import * | |||
|
17 | from .view import * | |||
|
18 |
@@ -30,7 +30,7 b' def check_ready(f, self, *args, **kwargs):' | |||||
30 | class AsyncResult(object): |
|
30 | class AsyncResult(object): | |
31 | """Class for representing results of non-blocking calls. |
|
31 | """Class for representing results of non-blocking calls. | |
32 |
|
32 | |||
33 | Provides the same interface as :py:class:`multiprocessing.AsyncResult`. |
|
33 | Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`. | |
34 | """ |
|
34 | """ | |
35 |
|
35 | |||
36 | msg_ids = None |
|
36 | msg_ids = None | |
@@ -53,7 +53,8 b' class AsyncResult(object):' | |||||
53 |
|
53 | |||
54 |
|
54 | |||
55 | def _reconstruct_result(self, res): |
|
55 | def _reconstruct_result(self, res): | |
56 | """ |
|
56 | """Reconstruct our result from actual result list (always a list) | |
|
57 | ||||
57 | Override me in subclasses for turning a list of results |
|
58 | Override me in subclasses for turning a list of results | |
58 | into the expected form. |
|
59 | into the expected form. | |
59 | """ |
|
60 | """ | |
@@ -68,7 +69,7 b' class AsyncResult(object):' | |||||
68 | If `timeout` is not ``None`` and the result does not arrive within |
|
69 | If `timeout` is not ``None`` and the result does not arrive within | |
69 | `timeout` seconds then ``TimeoutError`` is raised. If the |
|
70 | `timeout` seconds then ``TimeoutError`` is raised. If the | |
70 | remote call raised an exception then that exception will be reraised |
|
71 | remote call raised an exception then that exception will be reraised | |
71 | by get(). |
|
72 | by get() inside a `RemoteError`. | |
72 | """ |
|
73 | """ | |
73 | if not self.ready(): |
|
74 | if not self.ready(): | |
74 | self.wait(timeout) |
|
75 | self.wait(timeout) | |
@@ -89,6 +90,8 b' class AsyncResult(object):' | |||||
89 |
|
90 | |||
90 | def wait(self, timeout=-1): |
|
91 | def wait(self, timeout=-1): | |
91 | """Wait until the result is available or until `timeout` seconds pass. |
|
92 | """Wait until the result is available or until `timeout` seconds pass. | |
|
93 | ||||
|
94 | This method always returns None. | |||
92 | """ |
|
95 | """ | |
93 | if self._ready: |
|
96 | if self._ready: | |
94 | return |
|
97 | return | |
@@ -118,7 +121,7 b' class AsyncResult(object):' | |||||
118 |
|
121 | |||
119 | Will raise ``AssertionError`` if the result is not ready. |
|
122 | Will raise ``AssertionError`` if the result is not ready. | |
120 | """ |
|
123 | """ | |
121 |
assert self. |
|
124 | assert self.ready() | |
122 | return self._success |
|
125 | return self._success | |
123 |
|
126 | |||
124 | #---------------------------------------------------------------- |
|
127 | #---------------------------------------------------------------- | |
@@ -126,7 +129,11 b' class AsyncResult(object):' | |||||
126 | #---------------------------------------------------------------- |
|
129 | #---------------------------------------------------------------- | |
127 |
|
130 | |||
128 | def get_dict(self, timeout=-1): |
|
131 | def get_dict(self, timeout=-1): | |
129 |
"""Get the results as a dict, keyed by engine_id. |
|
132 | """Get the results as a dict, keyed by engine_id. | |
|
133 | ||||
|
134 | timeout behavior is described in `get()`. | |||
|
135 | """ | |||
|
136 | ||||
130 | results = self.get(timeout) |
|
137 | results = self.get(timeout) | |
131 | engine_ids = [ md['engine_id'] for md in self._metadata ] |
|
138 | engine_ids = [ md['engine_id'] for md in self._metadata ] | |
132 | bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k)) |
|
139 | bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k)) | |
@@ -140,7 +147,7 b' class AsyncResult(object):' | |||||
140 | @property |
|
147 | @property | |
141 | @check_ready |
|
148 | @check_ready | |
142 | def result(self): |
|
149 | def result(self): | |
143 | """result property.""" |
|
150 | """result property wrapper for `get(timeout=0)`.""" | |
144 | return self._result |
|
151 | return self._result | |
145 |
|
152 | |||
146 | # abbreviated alias: |
|
153 | # abbreviated alias: | |
@@ -149,7 +156,7 b' class AsyncResult(object):' | |||||
149 | @property |
|
156 | @property | |
150 | @check_ready |
|
157 | @check_ready | |
151 | def metadata(self): |
|
158 | def metadata(self): | |
152 | """metadata property.""" |
|
159 | """property for accessing execution metadata.""" | |
153 | if self._single_result: |
|
160 | if self._single_result: | |
154 | return self._metadata[0] |
|
161 | return self._metadata[0] | |
155 | else: |
|
162 | else: | |
@@ -186,7 +193,7 b' class AsyncResult(object):' | |||||
186 |
|
193 | |||
187 | @check_ready |
|
194 | @check_ready | |
188 | def __getattr__(self, key): |
|
195 | def __getattr__(self, key): | |
189 | """getattr maps to getitem for convenient access to metadata.""" |
|
196 | """getattr maps to getitem for convenient attr access to metadata.""" | |
190 | if key not in self._metadata[0].keys(): |
|
197 | if key not in self._metadata[0].keys(): | |
191 | raise AttributeError("%r object has no attribute %r"%( |
|
198 | raise AttributeError("%r object has no attribute %r"%( | |
192 | self.__class__.__name__, key)) |
|
199 | self.__class__.__name__, key)) | |
@@ -249,7 +256,11 b' class AsyncMapResult(AsyncResult):' | |||||
249 |
|
256 | |||
250 |
|
257 | |||
251 | class AsyncHubResult(AsyncResult): |
|
258 | class AsyncHubResult(AsyncResult): | |
252 |
"""Class to wrap pending results that must be requested from the Hub |
|
259 | """Class to wrap pending results that must be requested from the Hub. | |
|
260 | ||||
|
261 | Note that waiting/polling on these objects requires polling the Hubover the network, | |||
|
262 | so use `AsyncHubResult.wait()` sparingly. | |||
|
263 | """ | |||
253 |
|
264 | |||
254 | def wait(self, timeout=-1): |
|
265 | def wait(self, timeout=-1): | |
255 | """wait for result to complete.""" |
|
266 | """wait for result to complete.""" |
@@ -32,12 +32,13 b' from IPython.external.ssh import tunnel' | |||||
32 |
|
32 | |||
33 | from . import error |
|
33 | from . import error | |
34 | from . import map as Map |
|
34 | from . import map as Map | |
|
35 | from . import util | |||
35 | from . import streamsession as ss |
|
36 | from . import streamsession as ss | |
36 | from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult |
|
37 | from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult | |
37 | from .clusterdir import ClusterDir, ClusterDirError |
|
38 | from .clusterdir import ClusterDir, ClusterDirError | |
38 | from .dependency import Dependency, depend, require, dependent |
|
39 | from .dependency import Dependency, depend, require, dependent | |
39 | from .remotefunction import remote,parallel,ParallelFunction,RemoteFunction |
|
40 | from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction | |
40 |
from .util import ReverseDict, |
|
41 | from .util import ReverseDict, validate_url, disambiguate_url | |
41 | from .view import DirectView, LoadBalancedView |
|
42 | from .view import DirectView, LoadBalancedView | |
42 |
|
43 | |||
43 | #-------------------------------------------------------------------------- |
|
44 | #-------------------------------------------------------------------------- | |
@@ -489,7 +490,7 b' class Client(HasTraits):' | |||||
489 |
|
490 | |||
490 | def _unwrap_exception(self, content): |
|
491 | def _unwrap_exception(self, content): | |
491 | """unwrap exception, and remap engineid to int.""" |
|
492 | """unwrap exception, and remap engineid to int.""" | |
492 |
e = |
|
493 | e = error.unwrap_exception(content) | |
493 | if e.engine_info: |
|
494 | if e.engine_info: | |
494 | e_uuid = e.engine_info['engine_uuid'] |
|
495 | e_uuid = e.engine_info['engine_uuid'] | |
495 | eid = self._engines[e_uuid] |
|
496 | eid = self._engines[e_uuid] | |
@@ -526,11 +527,11 b' class Client(HasTraits):' | |||||
526 | md['engine_id'] = self._engines.get(md['engine_uuid'], None) |
|
527 | md['engine_id'] = self._engines.get(md['engine_uuid'], None) | |
527 |
|
528 | |||
528 | if 'date' in parent: |
|
529 | if 'date' in parent: | |
529 |
md['submitted'] = datetime.strptime(parent['date'], |
|
530 | md['submitted'] = datetime.strptime(parent['date'], util.ISO8601) | |
530 | if 'started' in header: |
|
531 | if 'started' in header: | |
531 |
md['started'] = datetime.strptime(header['started'], |
|
532 | md['started'] = datetime.strptime(header['started'], util.ISO8601) | |
532 | if 'date' in header: |
|
533 | if 'date' in header: | |
533 |
md['completed'] = datetime.strptime(header['date'], |
|
534 | md['completed'] = datetime.strptime(header['date'], util.ISO8601) | |
534 | return md |
|
535 | return md | |
535 |
|
536 | |||
536 | def _handle_execute_reply(self, msg): |
|
537 | def _handle_execute_reply(self, msg): | |
@@ -573,7 +574,7 b' class Client(HasTraits):' | |||||
573 |
|
574 | |||
574 | # construct result: |
|
575 | # construct result: | |
575 | if content['status'] == 'ok': |
|
576 | if content['status'] == 'ok': | |
576 |
self.results[msg_id] = |
|
577 | self.results[msg_id] = util.unserialize_object(msg['buffers'])[0] | |
577 | elif content['status'] == 'aborted': |
|
578 | elif content['status'] == 'aborted': | |
578 | self.results[msg_id] = error.AbortedTask(msg_id) |
|
579 | self.results[msg_id] = error.AbortedTask(msg_id) | |
579 | elif content['status'] == 'resubmitted': |
|
580 | elif content['status'] == 'resubmitted': | |
@@ -1055,7 +1056,7 b' class Client(HasTraits):' | |||||
1055 | after = self._build_dependency(after) |
|
1056 | after = self._build_dependency(after) | |
1056 | follow = self._build_dependency(follow) |
|
1057 | follow = self._build_dependency(follow) | |
1057 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) |
|
1058 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) | |
1058 |
bufs = |
|
1059 | bufs = util.pack_apply_message(f,args,kwargs) | |
1059 | content = dict(bound=bound) |
|
1060 | content = dict(bound=bound) | |
1060 |
|
1061 | |||
1061 | msg = self.session.send(self._task_socket, "apply_request", |
|
1062 | msg = self.session.send(self._task_socket, "apply_request", | |
@@ -1087,7 +1088,7 b' class Client(HasTraits):' | |||||
1087 |
|
1088 | |||
1088 | subheader = {} |
|
1089 | subheader = {} | |
1089 | content = dict(bound=bound) |
|
1090 | content = dict(bound=bound) | |
1090 |
bufs = |
|
1091 | bufs = util.pack_apply_message(f,args,kwargs) | |
1091 |
|
1092 | |||
1092 | msg_ids = [] |
|
1093 | msg_ids = [] | |
1093 | for ident in idents: |
|
1094 | for ident in idents: | |
@@ -1399,7 +1400,7 b' class Client(HasTraits):' | |||||
1399 | md.update(iodict) |
|
1400 | md.update(iodict) | |
1400 |
|
1401 | |||
1401 | if rcontent['status'] == 'ok': |
|
1402 | if rcontent['status'] == 'ok': | |
1402 |
res,buffers = |
|
1403 | res,buffers = util.unserialize_object(buffers) | |
1403 | else: |
|
1404 | else: | |
1404 | print rcontent |
|
1405 | print rcontent | |
1405 | res = self._unwrap_exception(rcontent) |
|
1406 | res = self._unwrap_exception(rcontent) | |
@@ -1437,7 +1438,7 b' class Client(HasTraits):' | |||||
1437 | status = content.pop('status') |
|
1438 | status = content.pop('status') | |
1438 | if status != 'ok': |
|
1439 | if status != 'ok': | |
1439 | raise self._unwrap_exception(content) |
|
1440 | raise self._unwrap_exception(content) | |
1440 |
return |
|
1441 | return util.rekey(content) | |
1441 |
|
1442 | |||
1442 | @spinfirst |
|
1443 | @spinfirst | |
1443 | def purge_results(self, jobs=[], targets=[]): |
|
1444 | def purge_results(self, jobs=[], targets=[]): | |
@@ -1495,5 +1496,6 b" __all__ = [ 'Client'," | |||||
1495 | 'DirectView', |
|
1496 | 'DirectView', | |
1496 | 'LoadBalancedView', |
|
1497 | 'LoadBalancedView', | |
1497 | 'AsyncResult', |
|
1498 | 'AsyncResult', | |
1498 | 'AsyncMapResult' |
|
1499 | 'AsyncMapResult', | |
|
1500 | 'Reference' | |||
1499 | ] |
|
1501 | ] |
@@ -22,7 +22,6 b' import logging' | |||||
22 | import re |
|
22 | import re | |
23 | import shutil |
|
23 | import shutil | |
24 | import sys |
|
24 | import sys | |
25 | import warnings |
|
|||
26 |
|
25 | |||
27 | from IPython.config.loader import PyFileConfigLoader |
|
26 | from IPython.config.loader import PyFileConfigLoader | |
28 | from IPython.config.configurable import Configurable |
|
27 | from IPython.config.configurable import Configurable |
@@ -21,7 +21,7 b' import zmq' | |||||
21 | from zmq.devices import ProcessMonitoredQueue |
|
21 | from zmq.devices import ProcessMonitoredQueue | |
22 | # internal: |
|
22 | # internal: | |
23 | from IPython.utils.importstring import import_item |
|
23 | from IPython.utils.importstring import import_item | |
24 | from IPython.utils.traitlets import Int, Str, Instance, List, Bool |
|
24 | from IPython.utils.traitlets import Int, CStr, Instance, List, Bool | |
25 |
|
25 | |||
26 | from .entry_point import signal_children |
|
26 | from .entry_point import signal_children | |
27 | from .hub import Hub, HubFactory |
|
27 | from .hub import Hub, HubFactory | |
@@ -41,7 +41,7 b' class ControllerFactory(HubFactory):' | |||||
41 |
|
41 | |||
42 | # internal |
|
42 | # internal | |
43 | children = List() |
|
43 | children = List() | |
44 | mq_class = Str('zmq.devices.ProcessMonitoredQueue') |
|
44 | mq_class = CStr('zmq.devices.ProcessMonitoredQueue') | |
45 |
|
45 | |||
46 | def _usethreads_changed(self, name, old, new): |
|
46 | def _usethreads_changed(self, name, old, new): | |
47 | self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') |
|
47 | self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') |
@@ -7,7 +7,25 b' from .error import UnmetDependency' | |||||
7 |
|
7 | |||
8 |
|
8 | |||
9 | class depend(object): |
|
9 | class depend(object): | |
10 |
"""Dependency decorator, for use with tasks. |
|
10 | """Dependency decorator, for use with tasks. | |
|
11 | ||||
|
12 | `@depend` lets you define a function for engine dependencies | |||
|
13 | just like you use `apply` for tasks. | |||
|
14 | ||||
|
15 | ||||
|
16 | Examples | |||
|
17 | -------- | |||
|
18 | :: | |||
|
19 | ||||
|
20 | @depend(df, a,b, c=5) | |||
|
21 | def f(m,n,p) | |||
|
22 | ||||
|
23 | view.apply(f, 1,2,3) | |||
|
24 | ||||
|
25 | will call df(a,b,c=5) on the engine, and if it returns False or | |||
|
26 | raises an UnmetDependency error, then the task will not be run | |||
|
27 | and another engine will be tried. | |||
|
28 | """ | |||
11 | def __init__(self, f, *args, **kwargs): |
|
29 | def __init__(self, f, *args, **kwargs): | |
12 | self.f = f |
|
30 | self.f = f | |
13 | self.args = args |
|
31 | self.args = args | |
@@ -39,6 +57,7 b' class dependent(object):' | |||||
39 | return self.func_name |
|
57 | return self.func_name | |
40 |
|
58 | |||
41 | def _require(*names): |
|
59 | def _require(*names): | |
|
60 | """Helper for @require decorator.""" | |||
42 | for name in names: |
|
61 | for name in names: | |
43 | try: |
|
62 | try: | |
44 | __import__(name) |
|
63 | __import__(name) | |
@@ -47,12 +66,35 b' def _require(*names):' | |||||
47 | return True |
|
66 | return True | |
48 |
|
67 | |||
49 | def require(*names): |
|
68 | def require(*names): | |
|
69 | """Simple decorator for requiring names to be importable. | |||
|
70 | ||||
|
71 | Examples | |||
|
72 | -------- | |||
|
73 | ||||
|
74 | In [1]: @require('numpy') | |||
|
75 | ...: def norm(a): | |||
|
76 | ...: import numpy | |||
|
77 | ...: return numpy.linalg.norm(a,2) | |||
|
78 | """ | |||
50 | return depend(_require, *names) |
|
79 | return depend(_require, *names) | |
51 |
|
80 | |||
52 | class Dependency(set): |
|
81 | class Dependency(set): | |
53 | """An object for representing a set of msg_id dependencies. |
|
82 | """An object for representing a set of msg_id dependencies. | |
54 |
|
83 | |||
55 |
Subclassed from set(). |
|
84 | Subclassed from set(). | |
|
85 | ||||
|
86 | Parameters | |||
|
87 | ---------- | |||
|
88 | dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict() | |||
|
89 | The msg_ids to depend on | |||
|
90 | all : bool [default True] | |||
|
91 | Whether the dependency should be considered met when *all* depending tasks have completed | |||
|
92 | or only when *any* have been completed. | |||
|
93 | success_only : bool [default True] | |||
|
94 | Whether to consider only successes for Dependencies, or consider failures as well. | |||
|
95 | If `all=success_only=True`, then this task will fail with an ImpossibleDependency | |||
|
96 | as soon as the first depended-upon task fails. | |||
|
97 | """ | |||
56 |
|
98 | |||
57 | all=True |
|
99 | all=True | |
58 | success_only=True |
|
100 | success_only=True |
@@ -45,15 +45,15 b' We support a subset of mongodb operators:' | |||||
45 | from datetime import datetime |
|
45 | from datetime import datetime | |
46 |
|
46 | |||
47 | filters = { |
|
47 | filters = { | |
48 | '$eq' : lambda a,b: a==b, |
|
|||
49 | '$lt' : lambda a,b: a < b, |
|
48 | '$lt' : lambda a,b: a < b, | |
50 | '$gt' : lambda a,b: b > a, |
|
49 | '$gt' : lambda a,b: b > a, | |
|
50 | '$eq' : lambda a,b: a == b, | |||
|
51 | '$ne' : lambda a,b: a != b, | |||
51 | '$lte': lambda a,b: a <= b, |
|
52 | '$lte': lambda a,b: a <= b, | |
52 | '$gte': lambda a,b: a >= b, |
|
53 | '$gte': lambda a,b: a >= b, | |
53 | '$ne' : lambda a,b: not a==b, |
|
|||
54 | '$in' : lambda a,b: a in b, |
|
54 | '$in' : lambda a,b: a in b, | |
55 | '$nin': lambda a,b: a not in b, |
|
55 | '$nin': lambda a,b: a not in b, | |
56 |
'$all' |
|
56 | '$all': lambda a,b: all([ a in bb for bb in b ]), | |
57 | '$mod': lambda a,b: a%b[0] == b[1], |
|
57 | '$mod': lambda a,b: a%b[0] == b[1], | |
58 | '$exists' : lambda a,b: (b and a is not None) or (a is None and not b) |
|
58 | '$exists' : lambda a,b: (b and a is not None) or (a is None and not b) | |
59 | } |
|
59 | } |
@@ -1,21 +1,17 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """A simple engine that talks to a controller over 0MQ. |
|
2 | """A simple engine that talks to a controller over 0MQ. | |
3 | it handles registration, etc. and launches a kernel |
|
3 | it handles registration, etc. and launches a kernel | |
4 |
connected to the Controller's |
|
4 | connected to the Controller's Schedulers. | |
5 | """ |
|
5 | """ | |
6 | from __future__ import print_function |
|
6 | from __future__ import print_function | |
7 |
|
7 | |||
8 | import logging |
|
|||
9 | import sys |
|
8 | import sys | |
10 | import time |
|
9 | import time | |
11 | import uuid |
|
|||
12 | from pprint import pprint |
|
|||
13 |
|
10 | |||
14 | import zmq |
|
11 | import zmq | |
15 | from zmq.eventloop import ioloop, zmqstream |
|
12 | from zmq.eventloop import ioloop, zmqstream | |
16 |
|
13 | |||
17 | # internal |
|
14 | # internal | |
18 | from IPython.config.configurable import Configurable |
|
|||
19 | from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat |
|
15 | from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat | |
20 | # from IPython.utils.localinterfaces import LOCALHOST |
|
16 | # from IPython.utils.localinterfaces import LOCALHOST | |
21 |
|
17 | |||
@@ -25,10 +21,6 b' from .streamkernel import Kernel' | |||||
25 | from .streamsession import Message |
|
21 | from .streamsession import Message | |
26 | from .util import disambiguate_url |
|
22 | from .util import disambiguate_url | |
27 |
|
23 | |||
28 | def printer(*msg): |
|
|||
29 | # print (self.log.handlers, file=sys.__stdout__) |
|
|||
30 | self.log.info(str(msg)) |
|
|||
31 |
|
||||
32 | class EngineFactory(RegistrationFactory): |
|
24 | class EngineFactory(RegistrationFactory): | |
33 | """IPython engine""" |
|
25 | """IPython engine""" | |
34 |
|
26 |
@@ -3,6 +3,9 b'' | |||||
3 | """Classes and functions for kernel related errors and exceptions.""" |
|
3 | """Classes and functions for kernel related errors and exceptions.""" | |
4 | from __future__ import print_function |
|
4 | from __future__ import print_function | |
5 |
|
5 | |||
|
6 | import sys | |||
|
7 | import traceback | |||
|
8 | ||||
6 | __docformat__ = "restructuredtext en" |
|
9 | __docformat__ = "restructuredtext en" | |
7 |
|
10 | |||
8 | # Tell nose to skip this module |
|
11 | # Tell nose to skip this module | |
@@ -290,3 +293,21 b" def collect_exceptions(rdict_or_list, method='unspecified'):" | |||||
290 | except CompositeError as e: |
|
293 | except CompositeError as e: | |
291 | raise e |
|
294 | raise e | |
292 |
|
295 | |||
|
296 | def wrap_exception(engine_info={}): | |||
|
297 | etype, evalue, tb = sys.exc_info() | |||
|
298 | stb = traceback.format_exception(etype, evalue, tb) | |||
|
299 | exc_content = { | |||
|
300 | 'status' : 'error', | |||
|
301 | 'traceback' : stb, | |||
|
302 | 'ename' : unicode(etype.__name__), | |||
|
303 | 'evalue' : unicode(evalue), | |||
|
304 | 'engine_info' : engine_info | |||
|
305 | } | |||
|
306 | return exc_content | |||
|
307 | ||||
|
308 | def unwrap_exception(content): | |||
|
309 | err = RemoteError(content['ename'], content['evalue'], | |||
|
310 | ''.join(content['traceback']), | |||
|
311 | content.get('engine_info', {})) | |||
|
312 | return err | |||
|
313 |
@@ -31,7 +31,7 b' from IPython.zmq.parallel.entry_point import select_random_ports' | |||||
31 | class LoggingFactory(Configurable): |
|
31 | class LoggingFactory(Configurable): | |
32 | """A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait.""" |
|
32 | """A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait.""" | |
33 | log = Instance('logging.Logger', ('ZMQ', logging.WARN)) |
|
33 | log = Instance('logging.Logger', ('ZMQ', logging.WARN)) | |
34 |
logname = C |
|
34 | logname = CUnicode('ZMQ') | |
35 | def _logname_changed(self, name, old, new): |
|
35 | def _logname_changed(self, name, old, new): | |
36 | self.log = logging.getLogger(new) |
|
36 | self.log = logging.getLogger(new) | |
37 |
|
37 | |||
@@ -44,8 +44,8 b' class SessionFactory(LoggingFactory):' | |||||
44 | ident = CStr('',config=True) |
|
44 | ident = CStr('',config=True) | |
45 | def _ident_default(self): |
|
45 | def _ident_default(self): | |
46 | return str(uuid.uuid4()) |
|
46 | return str(uuid.uuid4()) | |
47 |
username = |
|
47 | username = CUnicode(os.environ.get('USER','username'),config=True) | |
48 |
exec_key = C |
|
48 | exec_key = CUnicode('',config=True) | |
49 | # not configurable: |
|
49 | # not configurable: | |
50 | context = Instance('zmq.Context', (), {}) |
|
50 | context = Instance('zmq.Context', (), {}) | |
51 | session = Instance('IPython.zmq.parallel.streamsession.StreamSession') |
|
51 | session = Instance('IPython.zmq.parallel.streamsession.StreamSession') |
@@ -15,7 +15,6 b' and monitors traffic through the various queues.' | |||||
15 | #----------------------------------------------------------------------------- |
|
15 | #----------------------------------------------------------------------------- | |
16 | from __future__ import print_function |
|
16 | from __future__ import print_function | |
17 |
|
17 | |||
18 | import logging |
|
|||
19 | import sys |
|
18 | import sys | |
20 | import time |
|
19 | import time | |
21 | from datetime import datetime |
|
20 | from datetime import datetime | |
@@ -25,16 +24,15 b' from zmq.eventloop import ioloop' | |||||
25 | from zmq.eventloop.zmqstream import ZMQStream |
|
24 | from zmq.eventloop.zmqstream import ZMQStream | |
26 |
|
25 | |||
27 | # internal: |
|
26 | # internal: | |
28 | from IPython.config.configurable import Configurable |
|
|||
29 | from IPython.utils.importstring import import_item |
|
27 | from IPython.utils.importstring import import_item | |
30 | from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool |
|
28 | from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool | |
31 |
|
29 | |||
32 | from .entry_point import select_random_ports |
|
30 | from .entry_point import select_random_ports | |
33 | from .factory import RegistrationFactory, LoggingFactory |
|
31 | from .factory import RegistrationFactory, LoggingFactory | |
34 |
|
32 | |||
|
33 | from . import error | |||
35 | from .heartmonitor import HeartMonitor |
|
34 | from .heartmonitor import HeartMonitor | |
36 | from .streamsession import Message, wrap_exception, ISO8601 |
|
35 | from .util import validate_url_container, ISO8601 | |
37 | from .util import validate_url_container |
|
|||
38 |
|
36 | |||
39 | try: |
|
37 | try: | |
40 | from pymongo.binary import Binary |
|
38 | from pymongo.binary import Binary | |
@@ -491,7 +489,7 b' class Hub(LoggingFactory):' | |||||
491 | try: |
|
489 | try: | |
492 | msg = self.session.unpack_message(msg, content=True) |
|
490 | msg = self.session.unpack_message(msg, content=True) | |
493 | except: |
|
491 | except: | |
494 | content = wrap_exception() |
|
492 | content = error.wrap_exception() | |
495 | self.log.error("Bad Client Message: %s"%msg, exc_info=True) |
|
493 | self.log.error("Bad Client Message: %s"%msg, exc_info=True) | |
496 | self.session.send(self.clientele, "hub_error", ident=client_id, |
|
494 | self.session.send(self.clientele, "hub_error", ident=client_id, | |
497 | content=content) |
|
495 | content=content) | |
@@ -505,7 +503,7 b' class Hub(LoggingFactory):' | |||||
505 | try: |
|
503 | try: | |
506 | assert handler is not None, "Bad Message Type: %s"%msg_type |
|
504 | assert handler is not None, "Bad Message Type: %s"%msg_type | |
507 | except: |
|
505 | except: | |
508 | content = wrap_exception() |
|
506 | content = error.wrap_exception() | |
509 | self.log.error("Bad Message Type: %s"%msg_type, exc_info=True) |
|
507 | self.log.error("Bad Message Type: %s"%msg_type, exc_info=True) | |
510 | self.session.send(self.clientele, "hub_error", ident=client_id, |
|
508 | self.session.send(self.clientele, "hub_error", ident=client_id, | |
511 | content=content) |
|
509 | content=content) | |
@@ -802,14 +800,14 b' class Hub(LoggingFactory):' | |||||
802 | try: |
|
800 | try: | |
803 | raise KeyError("queue_id %r in use"%queue) |
|
801 | raise KeyError("queue_id %r in use"%queue) | |
804 | except: |
|
802 | except: | |
805 | content = wrap_exception() |
|
803 | content = error.wrap_exception() | |
806 | self.log.error("queue_id %r in use"%queue, exc_info=True) |
|
804 | self.log.error("queue_id %r in use"%queue, exc_info=True) | |
807 | elif heart in self.hearts: # need to check unique hearts? |
|
805 | elif heart in self.hearts: # need to check unique hearts? | |
808 | try: |
|
806 | try: | |
809 | raise KeyError("heart_id %r in use"%heart) |
|
807 | raise KeyError("heart_id %r in use"%heart) | |
810 | except: |
|
808 | except: | |
811 | self.log.error("heart_id %r in use"%heart, exc_info=True) |
|
809 | self.log.error("heart_id %r in use"%heart, exc_info=True) | |
812 | content = wrap_exception() |
|
810 | content = error.wrap_exception() | |
813 | else: |
|
811 | else: | |
814 | for h, pack in self.incoming_registrations.iteritems(): |
|
812 | for h, pack in self.incoming_registrations.iteritems(): | |
815 | if heart == h: |
|
813 | if heart == h: | |
@@ -817,14 +815,14 b' class Hub(LoggingFactory):' | |||||
817 | raise KeyError("heart_id %r in use"%heart) |
|
815 | raise KeyError("heart_id %r in use"%heart) | |
818 | except: |
|
816 | except: | |
819 | self.log.error("heart_id %r in use"%heart, exc_info=True) |
|
817 | self.log.error("heart_id %r in use"%heart, exc_info=True) | |
820 | content = wrap_exception() |
|
818 | content = error.wrap_exception() | |
821 | break |
|
819 | break | |
822 | elif queue == pack[1]: |
|
820 | elif queue == pack[1]: | |
823 | try: |
|
821 | try: | |
824 | raise KeyError("queue_id %r in use"%queue) |
|
822 | raise KeyError("queue_id %r in use"%queue) | |
825 | except: |
|
823 | except: | |
826 | self.log.error("queue_id %r in use"%queue, exc_info=True) |
|
824 | self.log.error("queue_id %r in use"%queue, exc_info=True) | |
827 | content = wrap_exception() |
|
825 | content = error.wrap_exception() | |
828 | break |
|
826 | break | |
829 |
|
827 | |||
830 | msg = self.session.send(self.registrar, "registration_reply", |
|
828 | msg = self.session.send(self.registrar, "registration_reply", | |
@@ -928,7 +926,7 b' class Hub(LoggingFactory):' | |||||
928 | targets = content['targets'] |
|
926 | targets = content['targets'] | |
929 | targets = self._validate_targets(targets) |
|
927 | targets = self._validate_targets(targets) | |
930 | except: |
|
928 | except: | |
931 | content = wrap_exception() |
|
929 | content = error.wrap_exception() | |
932 | self.session.send(self.clientele, "hub_error", |
|
930 | self.session.send(self.clientele, "hub_error", | |
933 | content=content, ident=client_id) |
|
931 | content=content, ident=client_id) | |
934 | return |
|
932 | return | |
@@ -952,7 +950,7 b' class Hub(LoggingFactory):' | |||||
952 | try: |
|
950 | try: | |
953 | targets = self._validate_targets(targets) |
|
951 | targets = self._validate_targets(targets) | |
954 | except: |
|
952 | except: | |
955 | content = wrap_exception() |
|
953 | content = error.wrap_exception() | |
956 | self.session.send(self.clientele, "hub_error", |
|
954 | self.session.send(self.clientele, "hub_error", | |
957 | content=content, ident=client_id) |
|
955 | content=content, ident=client_id) | |
958 | return |
|
956 | return | |
@@ -987,12 +985,12 b' class Hub(LoggingFactory):' | |||||
987 | try: |
|
985 | try: | |
988 | raise IndexError("msg pending: %r"%msg_id) |
|
986 | raise IndexError("msg pending: %r"%msg_id) | |
989 | except: |
|
987 | except: | |
990 | reply = wrap_exception() |
|
988 | reply = error.wrap_exception() | |
991 | else: |
|
989 | else: | |
992 | try: |
|
990 | try: | |
993 | raise IndexError("No such msg: %r"%msg_id) |
|
991 | raise IndexError("No such msg: %r"%msg_id) | |
994 | except: |
|
992 | except: | |
995 | reply = wrap_exception() |
|
993 | reply = error.wrap_exception() | |
996 | break |
|
994 | break | |
997 | eids = content.get('engine_ids', []) |
|
995 | eids = content.get('engine_ids', []) | |
998 | for eid in eids: |
|
996 | for eid in eids: | |
@@ -1000,7 +998,7 b' class Hub(LoggingFactory):' | |||||
1000 | try: |
|
998 | try: | |
1001 | raise IndexError("No such engine: %i"%eid) |
|
999 | raise IndexError("No such engine: %i"%eid) | |
1002 | except: |
|
1000 | except: | |
1003 | reply = wrap_exception() |
|
1001 | reply = error.wrap_exception() | |
1004 | break |
|
1002 | break | |
1005 | msg_ids = self.completed.pop(eid) |
|
1003 | msg_ids = self.completed.pop(eid) | |
1006 | uid = self.engines[eid].queue |
|
1004 | uid = self.engines[eid].queue | |
@@ -1046,7 +1044,7 b' class Hub(LoggingFactory):' | |||||
1046 | try: |
|
1044 | try: | |
1047 | raise KeyError('No such message: '+msg_id) |
|
1045 | raise KeyError('No such message: '+msg_id) | |
1048 | except: |
|
1046 | except: | |
1049 | content = wrap_exception() |
|
1047 | content = error.wrap_exception() | |
1050 | break |
|
1048 | break | |
1051 | self.session.send(self.clientele, "result_reply", content=content, |
|
1049 | self.session.send(self.clientele, "result_reply", content=content, | |
1052 | parent=msg, ident=client_id, |
|
1050 | parent=msg, ident=client_id, |
@@ -102,7 +102,31 b' class RemoteFunction(object):' | |||||
102 |
|
102 | |||
103 |
|
103 | |||
104 | class ParallelFunction(RemoteFunction): |
|
104 | class ParallelFunction(RemoteFunction): | |
105 |
"""Class for mapping a function to sequences. |
|
105 | """Class for mapping a function to sequences. | |
|
106 | ||||
|
107 | This will distribute the sequences according the a mapper, and call | |||
|
108 | the function on each sub-sequence. If called via map, then the function | |||
|
109 | will be called once on each element, rather that each sub-sequence. | |||
|
110 | ||||
|
111 | Parameters | |||
|
112 | ---------- | |||
|
113 | ||||
|
114 | client : Client instance | |||
|
115 | The client to be used to connect to engines | |||
|
116 | f : callable | |||
|
117 | The function to be wrapped into a remote function | |||
|
118 | bound : bool [default: False] | |||
|
119 | Whether the affect the remote namespace when called | |||
|
120 | block : bool [default: None] | |||
|
121 | Whether to wait for results or not. The default behavior is | |||
|
122 | to use the current `block` attribute of `client` | |||
|
123 | targets : valid target list [default: all] | |||
|
124 | The targets on which to execute. | |||
|
125 | balanced : bool | |||
|
126 | Whether to load-balance with the Task scheduler or not | |||
|
127 | chunk_size : int or None | |||
|
128 | The size of chunk to use when breaking up sequences in a load-balanced manner | |||
|
129 | """ | |||
106 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None): |
|
130 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None): | |
107 | super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced) |
|
131 | super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced) | |
108 | self.chunk_size = chunk_size |
|
132 | self.chunk_size = chunk_size | |
@@ -164,7 +188,11 b' class ParallelFunction(RemoteFunction):' | |||||
164 | return r |
|
188 | return r | |
165 |
|
189 | |||
166 | def map(self, *sequences): |
|
190 | def map(self, *sequences): | |
167 |
"""call a function on each element of a sequence remotely. |
|
191 | """call a function on each element of a sequence remotely. | |
|
192 | This should behave very much like the builtin map, but return an AsyncMapResult | |||
|
193 | if self.block is False. | |||
|
194 | """ | |||
|
195 | # set _map as a flag for use inside self.__call__ | |||
168 | self._map = True |
|
196 | self._map = True | |
169 | try: |
|
197 | try: | |
170 | ret = self.__call__(*sequences) |
|
198 | ret = self.__call__(*sequences) | |
@@ -172,3 +200,4 b' class ParallelFunction(RemoteFunction):' | |||||
172 | del self._map |
|
200 | del self._map | |
173 | return ret |
|
201 | return ret | |
174 |
|
202 | |||
|
203 | __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] No newline at end of file |
@@ -31,7 +31,6 b' from IPython.external.decorator import decorator' | |||||
31 | from IPython.utils.traitlets import Instance, Dict, List, Set |
|
31 | from IPython.utils.traitlets import Instance, Dict, List, Set | |
32 |
|
32 | |||
33 | from . import error |
|
33 | from . import error | |
34 | from . import streamsession as ss |
|
|||
35 | from .dependency import Dependency |
|
34 | from .dependency import Dependency | |
36 | from .entry_point import connect_logger, local_logger |
|
35 | from .entry_point import connect_logger, local_logger | |
37 | from .factory import SessionFactory |
|
36 | from .factory import SessionFactory | |
@@ -237,7 +236,7 b' class TaskScheduler(SessionFactory):' | |||||
237 | try: |
|
236 | try: | |
238 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) |
|
237 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) | |
239 | except: |
|
238 | except: | |
240 |
content = |
|
239 | content = error.wrap_exception() | |
241 | msg = self.session.send(self.client_stream, 'apply_reply', content, |
|
240 | msg = self.session.send(self.client_stream, 'apply_reply', content, | |
242 | parent=parent, ident=idents) |
|
241 | parent=parent, ident=idents) | |
243 | self.session.send(self.mon_stream, msg, ident=['outtask']+idents) |
|
242 | self.session.send(self.mon_stream, msg, ident=['outtask']+idents) | |
@@ -340,7 +339,7 b' class TaskScheduler(SessionFactory):' | |||||
340 | try: |
|
339 | try: | |
341 | raise why() |
|
340 | raise why() | |
342 | except: |
|
341 | except: | |
343 |
content = |
|
342 | content = error.wrap_exception() | |
344 |
|
343 | |||
345 | self.all_done.add(msg_id) |
|
344 | self.all_done.add(msg_id) | |
346 | self.all_failed.add(msg_id) |
|
345 | self.all_failed.add(msg_id) |
@@ -9,13 +9,9 b' Kernel adapted from kernel.py to use ZMQ Streams' | |||||
9 |
|
9 | |||
10 | # Standard library imports. |
|
10 | # Standard library imports. | |
11 | from __future__ import print_function |
|
11 | from __future__ import print_function | |
12 | import __builtin__ |
|
|||
13 |
|
12 | |||
14 | import logging |
|
|||
15 | import os |
|
|||
16 | import sys |
|
13 | import sys | |
17 | import time |
|
14 | import time | |
18 | import traceback |
|
|||
19 |
|
15 | |||
20 | from code import CommandCompiler |
|
16 | from code import CommandCompiler | |
21 | from datetime import datetime |
|
17 | from datetime import datetime | |
@@ -28,16 +24,17 b' from zmq.eventloop import ioloop, zmqstream' | |||||
28 |
|
24 | |||
29 | # Local imports. |
|
25 | # Local imports. | |
30 | from IPython.core import ultratb |
|
26 | from IPython.core import ultratb | |
31 |
from IPython.utils.traitlets import |
|
27 | from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str | |
32 | from IPython.zmq.completer import KernelCompleter |
|
28 | from IPython.zmq.completer import KernelCompleter | |
33 | from IPython.zmq.iostream import OutStream |
|
29 | from IPython.zmq.iostream import OutStream | |
34 | from IPython.zmq.displayhook import DisplayHook |
|
30 | from IPython.zmq.displayhook import DisplayHook | |
35 |
|
31 | |||
36 | from . import heartmonitor |
|
32 | from . import heartmonitor | |
37 | from .client import Client |
|
33 | from .client import Client | |
|
34 | from .error import wrap_exception | |||
38 | from .factory import SessionFactory |
|
35 | from .factory import SessionFactory | |
39 |
from .streamsession import StreamSession |
|
36 | from .streamsession import StreamSession | |
40 | unpack_apply_message, ISO8601, wrap_exception |
|
37 | from .util import serialize_object, unpack_apply_message, ISO8601 | |
41 |
|
38 | |||
42 | def printer(*args): |
|
39 | def printer(*args): | |
43 | pprint(args, stream=sys.__stdout__) |
|
40 | pprint(args, stream=sys.__stdout__) |
@@ -5,8 +5,6 b'' | |||||
5 |
|
5 | |||
6 | import os |
|
6 | import os | |
7 | import pprint |
|
7 | import pprint | |
8 | import sys |
|
|||
9 | import traceback |
|
|||
10 | import uuid |
|
8 | import uuid | |
11 | from datetime import datetime |
|
9 | from datetime import datetime | |
12 |
|
10 | |||
@@ -21,10 +19,7 b' import zmq' | |||||
21 | from zmq.utils import jsonapi |
|
19 | from zmq.utils import jsonapi | |
22 | from zmq.eventloop.zmqstream import ZMQStream |
|
20 | from zmq.eventloop.zmqstream import ZMQStream | |
23 |
|
21 | |||
24 | from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence |
|
22 | from .util import ISO8601 | |
25 | from IPython.utils.newserialized import serialize, unserialize |
|
|||
26 |
|
||||
27 | from .error import RemoteError |
|
|||
28 |
|
23 | |||
29 | # packer priority: jsonlib[2], cPickle, simplejson/json, pickle |
|
24 | # packer priority: jsonlib[2], cPickle, simplejson/json, pickle | |
30 | json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__ |
|
25 | json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__ | |
@@ -66,26 +61,6 b' else:' | |||||
66 |
|
61 | |||
67 |
|
62 | |||
68 | DELIM="<IDS|MSG>" |
|
63 | DELIM="<IDS|MSG>" | |
69 | ISO8601="%Y-%m-%dT%H:%M:%S.%f" |
|
|||
70 |
|
||||
71 | def wrap_exception(engine_info={}): |
|
|||
72 | etype, evalue, tb = sys.exc_info() |
|
|||
73 | stb = traceback.format_exception(etype, evalue, tb) |
|
|||
74 | exc_content = { |
|
|||
75 | 'status' : 'error', |
|
|||
76 | 'traceback' : stb, |
|
|||
77 | 'ename' : unicode(etype.__name__), |
|
|||
78 | 'evalue' : unicode(evalue), |
|
|||
79 | 'engine_info' : engine_info |
|
|||
80 | } |
|
|||
81 | return exc_content |
|
|||
82 |
|
||||
83 | def unwrap_exception(content): |
|
|||
84 | err = RemoteError(content['ename'], content['evalue'], |
|
|||
85 | ''.join(content['traceback']), |
|
|||
86 | content.get('engine_info', {})) |
|
|||
87 | return err |
|
|||
88 |
|
||||
89 |
|
64 | |||
90 | class Message(object): |
|
65 | class Message(object): | |
91 | """A simple message object that maps dict keys to attributes. |
|
66 | """A simple message object that maps dict keys to attributes. | |
@@ -140,146 +115,6 b' def extract_header(msg_or_header):' | |||||
140 | h = dict(h) |
|
115 | h = dict(h) | |
141 | return h |
|
116 | return h | |
142 |
|
117 | |||
143 | def rekey(dikt): |
|
|||
144 | """Rekey a dict that has been forced to use str keys where there should be |
|
|||
145 | ints by json. This belongs in the jsonutil added by fperez.""" |
|
|||
146 | for k in dikt.iterkeys(): |
|
|||
147 | if isinstance(k, str): |
|
|||
148 | ik=fk=None |
|
|||
149 | try: |
|
|||
150 | ik = int(k) |
|
|||
151 | except ValueError: |
|
|||
152 | try: |
|
|||
153 | fk = float(k) |
|
|||
154 | except ValueError: |
|
|||
155 | continue |
|
|||
156 | if ik is not None: |
|
|||
157 | nk = ik |
|
|||
158 | else: |
|
|||
159 | nk = fk |
|
|||
160 | if nk in dikt: |
|
|||
161 | raise KeyError("already have key %r"%nk) |
|
|||
162 | dikt[nk] = dikt.pop(k) |
|
|||
163 | return dikt |
|
|||
164 |
|
||||
165 | def serialize_object(obj, threshold=64e-6): |
|
|||
166 | """Serialize an object into a list of sendable buffers. |
|
|||
167 |
|
||||
168 | Parameters |
|
|||
169 | ---------- |
|
|||
170 |
|
||||
171 | obj : object |
|
|||
172 | The object to be serialized |
|
|||
173 | threshold : float |
|
|||
174 | The threshold for not double-pickling the content. |
|
|||
175 |
|
||||
176 |
|
||||
177 | Returns |
|
|||
178 | ------- |
|
|||
179 | ('pmd', [bufs]) : |
|
|||
180 | where pmd is the pickled metadata wrapper, |
|
|||
181 | bufs is a list of data buffers |
|
|||
182 | """ |
|
|||
183 | databuffers = [] |
|
|||
184 | if isinstance(obj, (list, tuple)): |
|
|||
185 | clist = canSequence(obj) |
|
|||
186 | slist = map(serialize, clist) |
|
|||
187 | for s in slist: |
|
|||
188 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: |
|
|||
189 | databuffers.append(s.getData()) |
|
|||
190 | s.data = None |
|
|||
191 | return pickle.dumps(slist,-1), databuffers |
|
|||
192 | elif isinstance(obj, dict): |
|
|||
193 | sobj = {} |
|
|||
194 | for k in sorted(obj.iterkeys()): |
|
|||
195 | s = serialize(can(obj[k])) |
|
|||
196 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: |
|
|||
197 | databuffers.append(s.getData()) |
|
|||
198 | s.data = None |
|
|||
199 | sobj[k] = s |
|
|||
200 | return pickle.dumps(sobj,-1),databuffers |
|
|||
201 | else: |
|
|||
202 | s = serialize(can(obj)) |
|
|||
203 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: |
|
|||
204 | databuffers.append(s.getData()) |
|
|||
205 | s.data = None |
|
|||
206 | return pickle.dumps(s,-1),databuffers |
|
|||
207 |
|
||||
208 |
|
||||
209 | def unserialize_object(bufs): |
|
|||
210 | """reconstruct an object serialized by serialize_object from data buffers.""" |
|
|||
211 | bufs = list(bufs) |
|
|||
212 | sobj = pickle.loads(bufs.pop(0)) |
|
|||
213 | if isinstance(sobj, (list, tuple)): |
|
|||
214 | for s in sobj: |
|
|||
215 | if s.data is None: |
|
|||
216 | s.data = bufs.pop(0) |
|
|||
217 | return uncanSequence(map(unserialize, sobj)), bufs |
|
|||
218 | elif isinstance(sobj, dict): |
|
|||
219 | newobj = {} |
|
|||
220 | for k in sorted(sobj.iterkeys()): |
|
|||
221 | s = sobj[k] |
|
|||
222 | if s.data is None: |
|
|||
223 | s.data = bufs.pop(0) |
|
|||
224 | newobj[k] = uncan(unserialize(s)) |
|
|||
225 | return newobj, bufs |
|
|||
226 | else: |
|
|||
227 | if sobj.data is None: |
|
|||
228 | sobj.data = bufs.pop(0) |
|
|||
229 | return uncan(unserialize(sobj)), bufs |
|
|||
230 |
|
||||
231 | def pack_apply_message(f, args, kwargs, threshold=64e-6): |
|
|||
232 | """pack up a function, args, and kwargs to be sent over the wire |
|
|||
233 | as a series of buffers. Any object whose data is larger than `threshold` |
|
|||
234 | will not have their data copied (currently only numpy arrays support zero-copy)""" |
|
|||
235 | msg = [pickle.dumps(can(f),-1)] |
|
|||
236 | databuffers = [] # for large objects |
|
|||
237 | sargs, bufs = serialize_object(args,threshold) |
|
|||
238 | msg.append(sargs) |
|
|||
239 | databuffers.extend(bufs) |
|
|||
240 | skwargs, bufs = serialize_object(kwargs,threshold) |
|
|||
241 | msg.append(skwargs) |
|
|||
242 | databuffers.extend(bufs) |
|
|||
243 | msg.extend(databuffers) |
|
|||
244 | return msg |
|
|||
245 |
|
||||
246 | def unpack_apply_message(bufs, g=None, copy=True): |
|
|||
247 | """unpack f,args,kwargs from buffers packed by pack_apply_message() |
|
|||
248 | Returns: original f,args,kwargs""" |
|
|||
249 | bufs = list(bufs) # allow us to pop |
|
|||
250 | assert len(bufs) >= 3, "not enough buffers!" |
|
|||
251 | if not copy: |
|
|||
252 | for i in range(3): |
|
|||
253 | bufs[i] = bufs[i].bytes |
|
|||
254 | cf = pickle.loads(bufs.pop(0)) |
|
|||
255 | sargs = list(pickle.loads(bufs.pop(0))) |
|
|||
256 | skwargs = dict(pickle.loads(bufs.pop(0))) |
|
|||
257 | # print sargs, skwargs |
|
|||
258 | f = uncan(cf, g) |
|
|||
259 | for sa in sargs: |
|
|||
260 | if sa.data is None: |
|
|||
261 | m = bufs.pop(0) |
|
|||
262 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): |
|
|||
263 | if copy: |
|
|||
264 | sa.data = buffer(m) |
|
|||
265 | else: |
|
|||
266 | sa.data = m.buffer |
|
|||
267 | else: |
|
|||
268 | if copy: |
|
|||
269 | sa.data = m |
|
|||
270 | else: |
|
|||
271 | sa.data = m.bytes |
|
|||
272 |
|
||||
273 | args = uncanSequence(map(unserialize, sargs), g) |
|
|||
274 | kwargs = {} |
|
|||
275 | for k in sorted(skwargs.iterkeys()): |
|
|||
276 | sa = skwargs[k] |
|
|||
277 | if sa.data is None: |
|
|||
278 | sa.data = bufs.pop(0) |
|
|||
279 | kwargs[k] = uncan(unserialize(sa), g) |
|
|||
280 |
|
||||
281 | return f,args,kwargs |
|
|||
282 |
|
||||
283 | class StreamSession(object): |
|
118 | class StreamSession(object): | |
284 | """tweaked version of IPython.zmq.session.Session, for development in Parallel""" |
|
119 | """tweaked version of IPython.zmq.session.Session, for development in Parallel""" | |
285 | debug=False |
|
120 | debug=False |
@@ -47,24 +47,24 b' class TestSession(SessionTestCase):' | |||||
47 | self.assertEquals(s.username, 'carrot') |
|
47 | self.assertEquals(s.username, 'carrot') | |
48 |
|
48 | |||
49 |
|
49 | |||
50 | def test_rekey(self): |
|
50 | # def test_rekey(self): | |
51 | """rekeying dict around json str keys""" |
|
51 | # """rekeying dict around json str keys""" | |
52 | d = {'0': uuid.uuid4(), 0:uuid.uuid4()} |
|
52 | # d = {'0': uuid.uuid4(), 0:uuid.uuid4()} | |
53 | self.assertRaises(KeyError, ss.rekey, d) |
|
53 | # self.assertRaises(KeyError, ss.rekey, d) | |
54 |
|
54 | # | ||
55 | d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()} |
|
55 | # d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()} | |
56 | d2 = {0:d['0'],1:d[1],'asdf':d['asdf']} |
|
56 | # d2 = {0:d['0'],1:d[1],'asdf':d['asdf']} | |
57 | rd = ss.rekey(d) |
|
57 | # rd = ss.rekey(d) | |
58 | self.assertEquals(d2,rd) |
|
58 | # self.assertEquals(d2,rd) | |
59 |
|
59 | # | ||
60 | d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()} |
|
60 | # d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()} | |
61 | d2 = {1.5:d['1.5'],1:d['1']} |
|
61 | # d2 = {1.5:d['1.5'],1:d['1']} | |
62 | rd = ss.rekey(d) |
|
62 | # rd = ss.rekey(d) | |
63 | self.assertEquals(d2,rd) |
|
63 | # self.assertEquals(d2,rd) | |
64 |
|
64 | # | ||
65 | d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()} |
|
65 | # d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()} | |
66 | self.assertRaises(KeyError, ss.rekey, d) |
|
66 | # self.assertRaises(KeyError, ss.rekey, d) | |
67 |
|
67 | # | ||
68 | def test_unique_msg_ids(self): |
|
68 | def test_unique_msg_ids(self): | |
69 | """test that messages receive unique ids""" |
|
69 | """test that messages receive unique ids""" | |
70 | ids = set() |
|
70 | ids = set() |
@@ -1,7 +1,20 b'' | |||||
1 | """some generic utilities""" |
|
1 | """some generic utilities for dealing with classes, urls, and serialization""" | |
2 | import re |
|
2 | import re | |
3 | import socket |
|
3 | import socket | |
4 |
|
4 | |||
|
5 | try: | |||
|
6 | import cPickle | |||
|
7 | pickle = cPickle | |||
|
8 | except: | |||
|
9 | cPickle = None | |||
|
10 | import pickle | |||
|
11 | ||||
|
12 | ||||
|
13 | from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence | |||
|
14 | from IPython.utils.newserialized import serialize, unserialize | |||
|
15 | ||||
|
16 | ISO8601="%Y-%m-%dT%H:%M:%S.%f" | |||
|
17 | ||||
5 | class ReverseDict(dict): |
|
18 | class ReverseDict(dict): | |
6 | """simple double-keyed subset of dict methods.""" |
|
19 | """simple double-keyed subset of dict methods.""" | |
7 |
|
20 | |||
@@ -33,7 +46,6 b' class ReverseDict(dict):' | |||||
33 | return self[key] |
|
46 | return self[key] | |
34 | except KeyError: |
|
47 | except KeyError: | |
35 | return default |
|
48 | return default | |
36 |
|
||||
37 |
|
49 | |||
38 | def validate_url(url): |
|
50 | def validate_url(url): | |
39 | """validate a url for zeromq""" |
|
51 | """validate a url for zeromq""" | |
@@ -117,3 +129,143 b' def disambiguate_url(url, location=None):' | |||||
117 | return "%s://%s:%s"%(proto,ip,port) |
|
129 | return "%s://%s:%s"%(proto,ip,port) | |
118 |
|
130 | |||
119 |
|
131 | |||
|
132 | def rekey(dikt): | |||
|
133 | """Rekey a dict that has been forced to use str keys where there should be | |||
|
134 | ints by json. This belongs in the jsonutil added by fperez.""" | |||
|
135 | for k in dikt.iterkeys(): | |||
|
136 | if isinstance(k, str): | |||
|
137 | ik=fk=None | |||
|
138 | try: | |||
|
139 | ik = int(k) | |||
|
140 | except ValueError: | |||
|
141 | try: | |||
|
142 | fk = float(k) | |||
|
143 | except ValueError: | |||
|
144 | continue | |||
|
145 | if ik is not None: | |||
|
146 | nk = ik | |||
|
147 | else: | |||
|
148 | nk = fk | |||
|
149 | if nk in dikt: | |||
|
150 | raise KeyError("already have key %r"%nk) | |||
|
151 | dikt[nk] = dikt.pop(k) | |||
|
152 | return dikt | |||
|
153 | ||||
|
154 | def serialize_object(obj, threshold=64e-6): | |||
|
155 | """Serialize an object into a list of sendable buffers. | |||
|
156 | ||||
|
157 | Parameters | |||
|
158 | ---------- | |||
|
159 | ||||
|
160 | obj : object | |||
|
161 | The object to be serialized | |||
|
162 | threshold : float | |||
|
163 | The threshold for not double-pickling the content. | |||
|
164 | ||||
|
165 | ||||
|
166 | Returns | |||
|
167 | ------- | |||
|
168 | ('pmd', [bufs]) : | |||
|
169 | where pmd is the pickled metadata wrapper, | |||
|
170 | bufs is a list of data buffers | |||
|
171 | """ | |||
|
172 | databuffers = [] | |||
|
173 | if isinstance(obj, (list, tuple)): | |||
|
174 | clist = canSequence(obj) | |||
|
175 | slist = map(serialize, clist) | |||
|
176 | for s in slist: | |||
|
177 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |||
|
178 | databuffers.append(s.getData()) | |||
|
179 | s.data = None | |||
|
180 | return pickle.dumps(slist,-1), databuffers | |||
|
181 | elif isinstance(obj, dict): | |||
|
182 | sobj = {} | |||
|
183 | for k in sorted(obj.iterkeys()): | |||
|
184 | s = serialize(can(obj[k])) | |||
|
185 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |||
|
186 | databuffers.append(s.getData()) | |||
|
187 | s.data = None | |||
|
188 | sobj[k] = s | |||
|
189 | return pickle.dumps(sobj,-1),databuffers | |||
|
190 | else: | |||
|
191 | s = serialize(can(obj)) | |||
|
192 | if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold: | |||
|
193 | databuffers.append(s.getData()) | |||
|
194 | s.data = None | |||
|
195 | return pickle.dumps(s,-1),databuffers | |||
|
196 | ||||
|
197 | ||||
|
198 | def unserialize_object(bufs): | |||
|
199 | """reconstruct an object serialized by serialize_object from data buffers.""" | |||
|
200 | bufs = list(bufs) | |||
|
201 | sobj = pickle.loads(bufs.pop(0)) | |||
|
202 | if isinstance(sobj, (list, tuple)): | |||
|
203 | for s in sobj: | |||
|
204 | if s.data is None: | |||
|
205 | s.data = bufs.pop(0) | |||
|
206 | return uncanSequence(map(unserialize, sobj)), bufs | |||
|
207 | elif isinstance(sobj, dict): | |||
|
208 | newobj = {} | |||
|
209 | for k in sorted(sobj.iterkeys()): | |||
|
210 | s = sobj[k] | |||
|
211 | if s.data is None: | |||
|
212 | s.data = bufs.pop(0) | |||
|
213 | newobj[k] = uncan(unserialize(s)) | |||
|
214 | return newobj, bufs | |||
|
215 | else: | |||
|
216 | if sobj.data is None: | |||
|
217 | sobj.data = bufs.pop(0) | |||
|
218 | return uncan(unserialize(sobj)), bufs | |||
|
219 | ||||
|
220 | def pack_apply_message(f, args, kwargs, threshold=64e-6): | |||
|
221 | """pack up a function, args, and kwargs to be sent over the wire | |||
|
222 | as a series of buffers. Any object whose data is larger than `threshold` | |||
|
223 | will not have their data copied (currently only numpy arrays support zero-copy)""" | |||
|
224 | msg = [pickle.dumps(can(f),-1)] | |||
|
225 | databuffers = [] # for large objects | |||
|
226 | sargs, bufs = serialize_object(args,threshold) | |||
|
227 | msg.append(sargs) | |||
|
228 | databuffers.extend(bufs) | |||
|
229 | skwargs, bufs = serialize_object(kwargs,threshold) | |||
|
230 | msg.append(skwargs) | |||
|
231 | databuffers.extend(bufs) | |||
|
232 | msg.extend(databuffers) | |||
|
233 | return msg | |||
|
234 | ||||
|
235 | def unpack_apply_message(bufs, g=None, copy=True): | |||
|
236 | """unpack f,args,kwargs from buffers packed by pack_apply_message() | |||
|
237 | Returns: original f,args,kwargs""" | |||
|
238 | bufs = list(bufs) # allow us to pop | |||
|
239 | assert len(bufs) >= 3, "not enough buffers!" | |||
|
240 | if not copy: | |||
|
241 | for i in range(3): | |||
|
242 | bufs[i] = bufs[i].bytes | |||
|
243 | cf = pickle.loads(bufs.pop(0)) | |||
|
244 | sargs = list(pickle.loads(bufs.pop(0))) | |||
|
245 | skwargs = dict(pickle.loads(bufs.pop(0))) | |||
|
246 | # print sargs, skwargs | |||
|
247 | f = uncan(cf, g) | |||
|
248 | for sa in sargs: | |||
|
249 | if sa.data is None: | |||
|
250 | m = bufs.pop(0) | |||
|
251 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |||
|
252 | if copy: | |||
|
253 | sa.data = buffer(m) | |||
|
254 | else: | |||
|
255 | sa.data = m.buffer | |||
|
256 | else: | |||
|
257 | if copy: | |||
|
258 | sa.data = m | |||
|
259 | else: | |||
|
260 | sa.data = m.bytes | |||
|
261 | ||||
|
262 | args = uncanSequence(map(unserialize, sargs), g) | |||
|
263 | kwargs = {} | |||
|
264 | for k in sorted(skwargs.iterkeys()): | |||
|
265 | sa = skwargs[k] | |||
|
266 | if sa.data is None: | |||
|
267 | sa.data = bufs.pop(0) | |||
|
268 | kwargs[k] = uncan(unserialize(sa), g) | |||
|
269 | ||||
|
270 | return f,args,kwargs | |||
|
271 |
@@ -1,4 +1,4 b'' | |||||
1 | """Views of remote engines""" |
|
1 | """Views of remote engines.""" | |
2 | #----------------------------------------------------------------------------- |
|
2 | #----------------------------------------------------------------------------- | |
3 | # Copyright (C) 2010 The IPython Development Team |
|
3 | # Copyright (C) 2010 The IPython Development Team | |
4 | # |
|
4 | # | |
@@ -11,7 +11,7 b'' | |||||
11 | #----------------------------------------------------------------------------- |
|
11 | #----------------------------------------------------------------------------- | |
12 |
|
12 | |||
13 | from IPython.testing import decorators as testdec |
|
13 | from IPython.testing import decorators as testdec | |
14 | from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance |
|
14 | from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance | |
15 |
|
15 | |||
16 | from IPython.external.decorator import decorator |
|
16 | from IPython.external.decorator import decorator | |
17 |
|
17 | |||
@@ -82,7 +82,7 b' class View(HasTraits):' | |||||
82 | _ntargets = Int(1) |
|
82 | _ntargets = Int(1) | |
83 | _balanced = Bool(False) |
|
83 | _balanced = Bool(False) | |
84 | _default_names = List(['block', 'bound']) |
|
84 | _default_names = List(['block', 'bound']) | |
85 |
_targets = |
|
85 | _targets = Any() | |
86 |
|
86 | |||
87 | def __init__(self, client=None, targets=None): |
|
87 | def __init__(self, client=None, targets=None): | |
88 | super(View, self).__init__(client=client) |
|
88 | super(View, self).__init__(client=client) | |
@@ -655,3 +655,4 b' class LoadBalancedView(View):' | |||||
655 | chunk_size=chunk_size) |
|
655 | chunk_size=chunk_size) | |
656 | return pf.map(*sequences) |
|
656 | return pf.map(*sequences) | |
657 |
|
657 | |||
|
658 | __all__ = ['LoadBalancedView', 'DirectView'] No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now