##// END OF EJS Templates
copyright statements
MinRK -
Show More
@@ -1,322 +1,322 b''
1 1 """AsyncResult objects for the client"""
2 2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010-2011 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import time
14 14
15 15 from IPython.external.decorator import decorator
16 16 from . import error
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Classes
20 20 #-----------------------------------------------------------------------------
21 21
22 22 @decorator
23 23 def check_ready(f, self, *args, **kwargs):
24 24 """Call spin() to sync state prior to calling the method."""
25 25 self.wait(0)
26 26 if not self._ready:
27 27 raise error.TimeoutError("result not ready")
28 28 return f(self, *args, **kwargs)
29 29
30 30 class AsyncResult(object):
31 31 """Class for representing results of non-blocking calls.
32 32
33 33 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
34 34 """
35 35
36 36 msg_ids = None
37 37 _targets = None
38 38 _tracker = None
39 39
40 40 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
41 41 self._client = client
42 42 if isinstance(msg_ids, basestring):
43 43 msg_ids = [msg_ids]
44 44 self.msg_ids = msg_ids
45 45 self._fname=fname
46 46 self._targets = targets
47 47 self._tracker = tracker
48 48 self._ready = False
49 49 self._success = None
50 50 self._single_result = len(msg_ids) == 1
51 51
52 52 def __repr__(self):
53 53 if self._ready:
54 54 return "<%s: finished>"%(self.__class__.__name__)
55 55 else:
56 56 return "<%s: %s>"%(self.__class__.__name__,self._fname)
57 57
58 58
59 59 def _reconstruct_result(self, res):
60 60 """Reconstruct our result from actual result list (always a list)
61 61
62 62 Override me in subclasses for turning a list of results
63 63 into the expected form.
64 64 """
65 65 if self._single_result:
66 66 return res[0]
67 67 else:
68 68 return res
69 69
70 70 def get(self, timeout=-1):
71 71 """Return the result when it arrives.
72 72
73 73 If `timeout` is not ``None`` and the result does not arrive within
74 74 `timeout` seconds then ``TimeoutError`` is raised. If the
75 75 remote call raised an exception then that exception will be reraised
76 76 by get() inside a `RemoteError`.
77 77 """
78 78 if not self.ready():
79 79 self.wait(timeout)
80 80
81 81 if self._ready:
82 82 if self._success:
83 83 return self._result
84 84 else:
85 85 raise self._exception
86 86 else:
87 87 raise error.TimeoutError("Result not ready.")
88 88
89 89 def ready(self):
90 90 """Return whether the call has completed."""
91 91 if not self._ready:
92 92 self.wait(0)
93 93 return self._ready
94 94
95 95 def wait(self, timeout=-1):
96 96 """Wait until the result is available or until `timeout` seconds pass.
97 97
98 98 This method always returns None.
99 99 """
100 100 if self._ready:
101 101 return
102 102 self._ready = self._client.barrier(self.msg_ids, timeout)
103 103 if self._ready:
104 104 try:
105 105 results = map(self._client.results.get, self.msg_ids)
106 106 self._result = results
107 107 if self._single_result:
108 108 r = results[0]
109 109 if isinstance(r, Exception):
110 110 raise r
111 111 else:
112 112 results = error.collect_exceptions(results, self._fname)
113 113 self._result = self._reconstruct_result(results)
114 114 except Exception, e:
115 115 self._exception = e
116 116 self._success = False
117 117 else:
118 118 self._success = True
119 119 finally:
120 120 self._metadata = map(self._client.metadata.get, self.msg_ids)
121 121
122 122
123 123 def successful(self):
124 124 """Return whether the call completed without raising an exception.
125 125
126 126 Will raise ``AssertionError`` if the result is not ready.
127 127 """
128 128 assert self.ready()
129 129 return self._success
130 130
131 131 #----------------------------------------------------------------
132 132 # Extra methods not in mp.pool.AsyncResult
133 133 #----------------------------------------------------------------
134 134
135 135 def get_dict(self, timeout=-1):
136 136 """Get the results as a dict, keyed by engine_id.
137 137
138 138 timeout behavior is described in `get()`.
139 139 """
140 140
141 141 results = self.get(timeout)
142 142 engine_ids = [ md['engine_id'] for md in self._metadata ]
143 143 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
144 144 maxcount = bycount.count(bycount[-1])
145 145 if maxcount > 1:
146 146 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
147 147 maxcount, bycount[-1]))
148 148
149 149 return dict(zip(engine_ids,results))
150 150
151 151 @property
152 152 @check_ready
153 153 def result(self):
154 154 """result property wrapper for `get(timeout=0)`."""
155 155 return self._result
156 156
157 157 # abbreviated alias:
158 158 r = result
159 159
160 160 @property
161 161 @check_ready
162 162 def metadata(self):
163 163 """property for accessing execution metadata."""
164 164 if self._single_result:
165 165 return self._metadata[0]
166 166 else:
167 167 return self._metadata
168 168
169 169 @property
170 170 def result_dict(self):
171 171 """result property as a dict."""
172 172 return self.get_dict(0)
173 173
174 174 def __dict__(self):
175 175 return self.get_dict(0)
176 176
177 177 def abort(self):
178 178 """abort my tasks."""
179 179 assert not self.ready(), "Can't abort, I am already done!"
180 180 return self.client.abort(self.msg_ids, targets=self._targets, block=True)
181 181
182 182 @property
183 183 def sent(self):
184 184 """check whether my messages have been sent"""
185 185 if self._tracker is None:
186 186 return True
187 187 else:
188 188 return self._tracker.done
189 189
190 190 #-------------------------------------
191 191 # dict-access
192 192 #-------------------------------------
193 193
194 194 @check_ready
195 195 def __getitem__(self, key):
196 196 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
197 197 """
198 198 if isinstance(key, int):
199 199 return error.collect_exceptions([self._result[key]], self._fname)[0]
200 200 elif isinstance(key, slice):
201 201 return error.collect_exceptions(self._result[key], self._fname)
202 202 elif isinstance(key, basestring):
203 203 values = [ md[key] for md in self._metadata ]
204 204 if self._single_result:
205 205 return values[0]
206 206 else:
207 207 return values
208 208 else:
209 209 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
210 210
211 211 @check_ready
212 212 def __getattr__(self, key):
213 213 """getattr maps to getitem for convenient attr access to metadata."""
214 214 if key not in self._metadata[0].keys():
215 215 raise AttributeError("%r object has no attribute %r"%(
216 216 self.__class__.__name__, key))
217 217 return self.__getitem__(key)
218 218
219 219 # asynchronous iterator:
220 220 def __iter__(self):
221 221 if self._single_result:
222 222 raise TypeError("AsyncResults with a single result are not iterable.")
223 223 try:
224 224 rlist = self.get(0)
225 225 except error.TimeoutError:
226 226 # wait for each result individually
227 227 for msg_id in self.msg_ids:
228 228 ar = AsyncResult(self._client, msg_id, self._fname)
229 229 yield ar.get()
230 230 else:
231 231 # already done
232 232 for r in rlist:
233 233 yield r
234 234
235 235
236 236
237 237 class AsyncMapResult(AsyncResult):
238 238 """Class for representing results of non-blocking gathers.
239 239
240 240 This will properly reconstruct the gather.
241 241 """
242 242
243 243 def __init__(self, client, msg_ids, mapObject, fname=''):
244 244 AsyncResult.__init__(self, client, msg_ids, fname=fname)
245 245 self._mapObject = mapObject
246 246 self._single_result = False
247 247
248 248 def _reconstruct_result(self, res):
249 249 """Perform the gather on the actual results."""
250 250 return self._mapObject.joinPartitions(res)
251 251
252 252 # asynchronous iterator:
253 253 def __iter__(self):
254 254 try:
255 255 rlist = self.get(0)
256 256 except error.TimeoutError:
257 257 # wait for each result individually
258 258 for msg_id in self.msg_ids:
259 259 ar = AsyncResult(self._client, msg_id, self._fname)
260 260 rlist = ar.get()
261 261 try:
262 262 for r in rlist:
263 263 yield r
264 264 except TypeError:
265 265 # flattened, not a list
266 266 # this could get broken by flattened data that returns iterables
267 267 # but most calls to map do not expose the `flatten` argument
268 268 yield rlist
269 269 else:
270 270 # already done
271 271 for r in rlist:
272 272 yield r
273 273
274 274
275 275 class AsyncHubResult(AsyncResult):
276 276 """Class to wrap pending results that must be requested from the Hub.
277 277
278 278 Note that waiting/polling on these objects requires polling the Hubover the network,
279 279 so use `AsyncHubResult.wait()` sparingly.
280 280 """
281 281
282 282 def wait(self, timeout=-1):
283 283 """wait for result to complete."""
284 284 start = time.time()
285 285 if self._ready:
286 286 return
287 287 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
288 288 local_ready = self._client.barrier(local_ids, timeout)
289 289 if local_ready:
290 290 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
291 291 if not remote_ids:
292 292 self._ready = True
293 293 else:
294 294 rdict = self._client.result_status(remote_ids, status_only=False)
295 295 pending = rdict['pending']
296 296 while pending and (timeout < 0 or time.time() < start+timeout):
297 297 rdict = self._client.result_status(remote_ids, status_only=False)
298 298 pending = rdict['pending']
299 299 if pending:
300 300 time.sleep(0.1)
301 301 if not pending:
302 302 self._ready = True
303 303 if self._ready:
304 304 try:
305 305 results = map(self._client.results.get, self.msg_ids)
306 306 self._result = results
307 307 if self._single_result:
308 308 r = results[0]
309 309 if isinstance(r, Exception):
310 310 raise r
311 311 else:
312 312 results = error.collect_exceptions(results, self._fname)
313 313 self._result = self._reconstruct_result(results)
314 314 except Exception, e:
315 315 self._exception = e
316 316 self._success = False
317 317 else:
318 318 self._success = True
319 319 finally:
320 320 self._metadata = map(self._client.metadata.get, self.msg_ids)
321 321
322 322 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
@@ -1,153 +1,159 b''
1 1 """Dependency utilities"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010-2011 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
2 8
3 9 from IPython.external.decorator import decorator
4 10
5 11 from .asyncresult import AsyncResult
6 12 from .error import UnmetDependency
7 13
8 14
9 15 class depend(object):
10 16 """Dependency decorator, for use with tasks.
11 17
12 18 `@depend` lets you define a function for engine dependencies
13 19 just like you use `apply` for tasks.
14 20
15 21
16 22 Examples
17 23 --------
18 24 ::
19 25
20 26 @depend(df, a,b, c=5)
21 27 def f(m,n,p)
22 28
23 29 view.apply(f, 1,2,3)
24 30
25 31 will call df(a,b,c=5) on the engine, and if it returns False or
26 32 raises an UnmetDependency error, then the task will not be run
27 33 and another engine will be tried.
28 34 """
29 35 def __init__(self, f, *args, **kwargs):
30 36 self.f = f
31 37 self.args = args
32 38 self.kwargs = kwargs
33 39
34 40 def __call__(self, f):
35 41 return dependent(f, self.f, *self.args, **self.kwargs)
36 42
37 43 class dependent(object):
38 44 """A function that depends on another function.
39 45 This is an object to prevent the closure used
40 46 in traditional decorators, which are not picklable.
41 47 """
42 48
43 49 def __init__(self, f, df, *dargs, **dkwargs):
44 50 self.f = f
45 51 self.func_name = getattr(f, '__name__', 'f')
46 52 self.df = df
47 53 self.dargs = dargs
48 54 self.dkwargs = dkwargs
49 55
50 56 def __call__(self, *args, **kwargs):
51 57 if self.df(*self.dargs, **self.dkwargs) is False:
52 58 raise UnmetDependency()
53 59 return self.f(*args, **kwargs)
54 60
55 61 @property
56 62 def __name__(self):
57 63 return self.func_name
58 64
59 65 def _require(*names):
60 66 """Helper for @require decorator."""
61 67 for name in names:
62 68 try:
63 69 __import__(name)
64 70 except ImportError:
65 71 return False
66 72 return True
67 73
68 74 def require(*names):
69 75 """Simple decorator for requiring names to be importable.
70 76
71 77 Examples
72 78 --------
73 79
74 80 In [1]: @require('numpy')
75 81 ...: def norm(a):
76 82 ...: import numpy
77 83 ...: return numpy.linalg.norm(a,2)
78 84 """
79 85 return depend(_require, *names)
80 86
81 87 class Dependency(set):
82 88 """An object for representing a set of msg_id dependencies.
83 89
84 90 Subclassed from set().
85 91
86 92 Parameters
87 93 ----------
88 94 dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
89 95 The msg_ids to depend on
90 96 all : bool [default True]
91 97 Whether the dependency should be considered met when *all* depending tasks have completed
92 98 or only when *any* have been completed.
93 99 success_only : bool [default True]
94 100 Whether to consider only successes for Dependencies, or consider failures as well.
95 101 If `all=success_only=True`, then this task will fail with an ImpossibleDependency
96 102 as soon as the first depended-upon task fails.
97 103 """
98 104
99 105 all=True
100 106 success_only=True
101 107
102 108 def __init__(self, dependencies=[], all=True, success_only=True):
103 109 if isinstance(dependencies, dict):
104 110 # load from dict
105 111 all = dependencies.get('all', True)
106 112 success_only = dependencies.get('success_only', success_only)
107 113 dependencies = dependencies.get('dependencies', [])
108 114 ids = []
109 115 if isinstance(dependencies, AsyncResult):
110 116 ids.extend(AsyncResult.msg_ids)
111 117 else:
112 118 for d in dependencies:
113 119 if isinstance(d, basestring):
114 120 ids.append(d)
115 121 elif isinstance(d, AsyncResult):
116 122 ids.extend(d.msg_ids)
117 123 else:
118 124 raise TypeError("invalid dependency type: %r"%type(d))
119 125 set.__init__(self, ids)
120 126 self.all = all
121 127 self.success_only=success_only
122 128
123 129 def check(self, completed, failed=None):
124 130 if failed is not None and not self.success_only:
125 131 completed = completed.union(failed)
126 132 if len(self) == 0:
127 133 return True
128 134 if self.all:
129 135 return self.issubset(completed)
130 136 else:
131 137 return not self.isdisjoint(completed)
132 138
133 139 def unreachable(self, failed):
134 140 if len(self) == 0 or len(failed) == 0 or not self.success_only:
135 141 return False
136 142 # print self, self.success_only, self.all, failed
137 143 if self.all:
138 144 return not self.isdisjoint(failed)
139 145 else:
140 146 return self.issubset(failed)
141 147
142 148
143 149 def as_dict(self):
144 150 """Represent this dependency as a dict. For json compatibility."""
145 151 return dict(
146 152 dependencies=list(self),
147 153 all=self.all,
148 154 success_only=self.success_only,
149 155 )
150 156
151 157
152 158 __all__ = ['depend', 'require', 'dependent', 'Dependency']
153 159
@@ -1,148 +1,155 b''
1 1 #!/usr/bin/env python
2 2 """A simple engine that talks to a controller over 0MQ.
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's Schedulers.
5 5 """
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010-2011 The IPython Development Team
8 #
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
12
6 13 from __future__ import print_function
7 14
8 15 import sys
9 16 import time
10 17
11 18 import zmq
12 19 from zmq.eventloop import ioloop, zmqstream
13 20
14 21 # internal
15 22 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat
16 23 # from IPython.utils.localinterfaces import LOCALHOST
17 24
18 25 from . import heartmonitor
19 26 from .factory import RegistrationFactory
20 27 from .streamkernel import Kernel
21 28 from .streamsession import Message
22 29 from .util import disambiguate_url
23 30
24 31 class EngineFactory(RegistrationFactory):
25 32 """IPython engine"""
26 33
27 34 # configurables:
28 35 user_ns=Dict(config=True)
29 36 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
30 37 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
31 38 location=Str(config=True)
32 39 timeout=CFloat(2,config=True)
33 40
34 41 # not configurable:
35 42 id=Int(allow_none=True)
36 43 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
37 44 kernel=Instance(Kernel)
38 45
39 46
40 47 def __init__(self, **kwargs):
41 48 super(EngineFactory, self).__init__(**kwargs)
42 49 ctx = self.context
43 50
44 51 reg = ctx.socket(zmq.XREQ)
45 52 reg.setsockopt(zmq.IDENTITY, self.ident)
46 53 reg.connect(self.url)
47 54 self.registrar = zmqstream.ZMQStream(reg, self.loop)
48 55
49 56 def register(self):
50 57 """send the registration_request"""
51 58
52 59 self.log.info("registering")
53 60 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
54 61 self.registrar.on_recv(self.complete_registration)
55 62 # print (self.session.key)
56 63 self.session.send(self.registrar, "registration_request",content=content)
57 64
58 65 def complete_registration(self, msg):
59 66 # print msg
60 67 self._abort_dc.stop()
61 68 ctx = self.context
62 69 loop = self.loop
63 70 identity = self.ident
64 71
65 72 idents,msg = self.session.feed_identities(msg)
66 73 msg = Message(self.session.unpack_message(msg))
67 74
68 75 if msg.content.status == 'ok':
69 76 self.id = int(msg.content.id)
70 77
71 78 # create Shell Streams (MUX, Task, etc.):
72 79 queue_addr = msg.content.mux
73 80 shell_addrs = [ str(queue_addr) ]
74 81 task_addr = msg.content.task
75 82 if task_addr:
76 83 shell_addrs.append(str(task_addr))
77 84
78 85 # Uncomment this to go back to two-socket model
79 86 # shell_streams = []
80 87 # for addr in shell_addrs:
81 88 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
82 89 # stream.setsockopt(zmq.IDENTITY, identity)
83 90 # stream.connect(disambiguate_url(addr, self.location))
84 91 # shell_streams.append(stream)
85 92
86 93 # Now use only one shell stream for mux and tasks
87 94 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
88 95 stream.setsockopt(zmq.IDENTITY, identity)
89 96 shell_streams = [stream]
90 97 for addr in shell_addrs:
91 98 stream.connect(disambiguate_url(addr, self.location))
92 99 # end single stream-socket
93 100
94 101 # control stream:
95 102 control_addr = str(msg.content.control)
96 103 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
97 104 control_stream.setsockopt(zmq.IDENTITY, identity)
98 105 control_stream.connect(disambiguate_url(control_addr, self.location))
99 106
100 107 # create iopub stream:
101 108 iopub_addr = msg.content.iopub
102 109 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
103 110 iopub_stream.setsockopt(zmq.IDENTITY, identity)
104 111 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
105 112
106 113 # launch heartbeat
107 114 hb_addrs = msg.content.heartbeat
108 115 # print (hb_addrs)
109 116
110 117 # # Redirect input streams and set a display hook.
111 118 if self.out_stream_factory:
112 119 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
113 120 sys.stdout.topic = 'engine.%i.stdout'%self.id
114 121 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
115 122 sys.stderr.topic = 'engine.%i.stderr'%self.id
116 123 if self.display_hook_factory:
117 124 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
118 125 sys.displayhook.topic = 'engine.%i.pyout'%self.id
119 126
120 127 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
121 128 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
122 129 loop=loop, user_ns = self.user_ns, logname=self.log.name)
123 130 self.kernel.start()
124 131 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
125 132 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
126 133 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
127 134 heart.start()
128 135
129 136
130 137 else:
131 138 self.log.fatal("Registration Failed: %s"%msg)
132 139 raise Exception("Registration Failed: %s"%msg)
133 140
134 141 self.log.info("Completed registration with id %i"%self.id)
135 142
136 143
137 144 def abort(self):
138 145 self.log.fatal("Registration timed out")
139 146 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
140 147 time.sleep(1)
141 148 sys.exit(255)
142 149
143 150 def start(self):
144 151 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
145 152 dc.start()
146 153 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
147 154 self._abort_dc.start()
148 155
@@ -1,119 +1,125 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3
4 4 ************
5 5 NOTE: Most of this module has been deprecated by moving to Configurables
6 6 ************
7 7 """
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2010-2011 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
8 14
9 15 # Standard library imports.
10 16 import atexit
11 17 import logging
12 18 import os
13 19 import stat
14 20 import socket
15 21 import sys
16 22 from signal import signal, SIGINT, SIGABRT, SIGTERM
17 23 from subprocess import Popen, PIPE
18 24 try:
19 25 from signal import SIGKILL
20 26 except ImportError:
21 27 SIGKILL=None
22 28
23 29 # System library imports.
24 30 import zmq
25 31 from zmq.log import handlers
26 32
27 33 # Local imports.
28 34 from IPython.core.ultratb import FormattedTB
29 35 from IPython.external.argparse import ArgumentParser
30 36 from IPython.zmq.log import EnginePUBHandler
31 37
32 38 _random_ports = set()
33 39
34 40 def select_random_ports(n):
35 41 """Selects and return n random ports that are available."""
36 42 ports = []
37 43 for i in xrange(n):
38 44 sock = socket.socket()
39 45 sock.bind(('', 0))
40 46 while sock.getsockname()[1] in _random_ports:
41 47 sock.close()
42 48 sock = socket.socket()
43 49 sock.bind(('', 0))
44 50 ports.append(sock)
45 51 for i, sock in enumerate(ports):
46 52 port = sock.getsockname()[1]
47 53 sock.close()
48 54 ports[i] = port
49 55 _random_ports.add(port)
50 56 return ports
51 57
52 58 def signal_children(children):
53 59 """Relay interupt/term signals to children, for more solid process cleanup."""
54 60 def terminate_children(sig, frame):
55 61 logging.critical("Got signal %i, terminating children..."%sig)
56 62 for child in children:
57 63 child.terminate()
58 64
59 65 sys.exit(sig != SIGINT)
60 66 # sys.exit(sig)
61 67 for sig in (SIGINT, SIGABRT, SIGTERM):
62 68 signal(sig, terminate_children)
63 69
64 70 def generate_exec_key(keyfile):
65 71 import uuid
66 72 newkey = str(uuid.uuid4())
67 73 with open(keyfile, 'w') as f:
68 74 # f.write('ipython-key ')
69 75 f.write(newkey+'\n')
70 76 # set user-only RW permissions (0600)
71 77 # this will have no effect on Windows
72 78 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
73 79
74 80
75 81 def integer_loglevel(loglevel):
76 82 try:
77 83 loglevel = int(loglevel)
78 84 except ValueError:
79 85 if isinstance(loglevel, str):
80 86 loglevel = getattr(logging, loglevel)
81 87 return loglevel
82 88
83 89 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
84 90 logger = logging.getLogger(logname)
85 91 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
86 92 # don't add a second PUBHandler
87 93 return
88 94 loglevel = integer_loglevel(loglevel)
89 95 lsock = context.socket(zmq.PUB)
90 96 lsock.connect(iface)
91 97 handler = handlers.PUBHandler(lsock)
92 98 handler.setLevel(loglevel)
93 99 handler.root_topic = root
94 100 logger.addHandler(handler)
95 101 logger.setLevel(loglevel)
96 102
97 103 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
98 104 logger = logging.getLogger()
99 105 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
100 106 # don't add a second PUBHandler
101 107 return
102 108 loglevel = integer_loglevel(loglevel)
103 109 lsock = context.socket(zmq.PUB)
104 110 lsock.connect(iface)
105 111 handler = EnginePUBHandler(engine, lsock)
106 112 handler.setLevel(loglevel)
107 113 logger.addHandler(handler)
108 114 logger.setLevel(loglevel)
109 115
110 116 def local_logger(logname, loglevel=logging.DEBUG):
111 117 loglevel = integer_loglevel(loglevel)
112 118 logger = logging.getLogger(logname)
113 119 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
114 120 # don't add a second StreamHandler
115 121 return
116 122 handler = logging.StreamHandler()
117 123 handler.setLevel(loglevel)
118 124 logger.addHandler(handler)
119 125 logger.setLevel(loglevel)
@@ -1,158 +1,164 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their XREQ identities.
5 5 """
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010-2011 The IPython Development Team
8 #
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
6 12
7 13 from __future__ import print_function
8 14 import time
9 15 import logging
10 16 import uuid
11 17
12 18 import zmq
13 19 from zmq.devices import ProcessDevice,ThreadDevice
14 20 from zmq.eventloop import ioloop, zmqstream
15 21
16 22 from IPython.utils.traitlets import Set, Instance, CFloat, Bool
17 23 from .factory import LoggingFactory
18 24
19 25 class Heart(object):
20 26 """A basic heart object for responding to a HeartMonitor.
21 27 This is a simple wrapper with defaults for the most common
22 28 Device model for responding to heartbeats.
23 29
24 30 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
25 31 SUB/XREQ for in/out.
26 32
27 33 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
28 34 device=None
29 35 id=None
30 36 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
31 37 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
32 38 self.device.daemon=True
33 39 self.device.connect_in(in_addr)
34 40 self.device.connect_out(out_addr)
35 41 if in_type == zmq.SUB:
36 42 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
37 43 if heart_id is None:
38 44 heart_id = str(uuid.uuid4())
39 45 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
40 46 self.id = heart_id
41 47
42 48 def start(self):
43 49 return self.device.start()
44 50
45 51 class HeartMonitor(LoggingFactory):
46 52 """A basic HeartMonitor class
47 53 pingstream: a PUB stream
48 54 pongstream: an XREP stream
49 55 period: the period of the heartbeat in milliseconds"""
50 56
51 57 period=CFloat(1000, config=True) # in milliseconds
52 58
53 59 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
54 60 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
55 61 loop = Instance('zmq.eventloop.ioloop.IOLoop')
56 62 def _loop_default(self):
57 63 return ioloop.IOLoop.instance()
58 64 debug=Bool(False)
59 65
60 66 # not settable:
61 67 hearts=Set()
62 68 responses=Set()
63 69 on_probation=Set()
64 70 last_ping=CFloat(0)
65 71 _new_handlers = Set()
66 72 _failure_handlers = Set()
67 73 lifetime = CFloat(0)
68 74 tic = CFloat(0)
69 75
70 76 def __init__(self, **kwargs):
71 77 super(HeartMonitor, self).__init__(**kwargs)
72 78
73 79 self.pongstream.on_recv(self.handle_pong)
74 80
75 81 def start(self):
76 82 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
77 83 self.caller.start()
78 84
79 85 def add_new_heart_handler(self, handler):
80 86 """add a new handler for new hearts"""
81 87 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
82 88 self._new_handlers.add(handler)
83 89
84 90 def add_heart_failure_handler(self, handler):
85 91 """add a new handler for heart failure"""
86 92 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
87 93 self._failure_handlers.add(handler)
88 94
89 95 def beat(self):
90 96 self.pongstream.flush()
91 97 self.last_ping = self.lifetime
92 98
93 99 toc = time.time()
94 100 self.lifetime += toc-self.tic
95 101 self.tic = toc
96 102 # self.log.debug("heartbeat::%s"%self.lifetime)
97 103 goodhearts = self.hearts.intersection(self.responses)
98 104 missed_beats = self.hearts.difference(goodhearts)
99 105 heartfailures = self.on_probation.intersection(missed_beats)
100 106 newhearts = self.responses.difference(goodhearts)
101 107 map(self.handle_new_heart, newhearts)
102 108 map(self.handle_heart_failure, heartfailures)
103 109 self.on_probation = missed_beats.intersection(self.hearts)
104 110 self.responses = set()
105 111 # print self.on_probation, self.hearts
106 112 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
107 113 self.pingstream.send(str(self.lifetime))
108 114
109 115 def handle_new_heart(self, heart):
110 116 if self._new_handlers:
111 117 for handler in self._new_handlers:
112 118 handler(heart)
113 119 else:
114 120 self.log.info("heartbeat::yay, got new heart %s!"%heart)
115 121 self.hearts.add(heart)
116 122
117 123 def handle_heart_failure(self, heart):
118 124 if self._failure_handlers:
119 125 for handler in self._failure_handlers:
120 126 try:
121 127 handler(heart)
122 128 except Exception as e:
123 129 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
124 130 pass
125 131 else:
126 132 self.log.info("heartbeat::Heart %s failed :("%heart)
127 133 self.hearts.remove(heart)
128 134
129 135
130 136 def handle_pong(self, msg):
131 137 "a heart just beat"
132 138 if msg[1] == str(self.lifetime):
133 139 delta = time.time()-self.tic
134 140 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
135 141 self.responses.add(msg[0])
136 142 elif msg[1] == str(self.last_ping):
137 143 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
138 144 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
139 145 self.responses.add(msg[0])
140 146 else:
141 147 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
142 148 (msg[1],self.lifetime))
143 149
144 150
145 151 if __name__ == '__main__':
146 152 loop = ioloop.IOLoop.instance()
147 153 context = zmq.Context()
148 154 pub = context.socket(zmq.PUB)
149 155 pub.bind('tcp://127.0.0.1:5555')
150 156 xrep = context.socket(zmq.XREP)
151 157 xrep.bind('tcp://127.0.0.1:5556')
152 158
153 159 outstream = zmqstream.ZMQStream(pub, loop)
154 160 instream = zmqstream.ZMQStream(xrep, loop)
155 161
156 162 hb = HeartMonitor(loop, outstream, instream)
157 163
158 164 loop.start()
@@ -1,90 +1,97 b''
1 1 #!/usr/bin/env python
2 """Old ipcluster script. Possibly to be removed."""
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2010-2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
2 9 from __future__ import print_function
3 10
4 11 import os
5 12 import sys
6 13 import time
7 14 from subprocess import Popen, PIPE
8 15
9 16 from IPython.external.argparse import ArgumentParser, SUPPRESS
10 17
11 18 def _filter_arg(flag, args):
12 19 filtered = []
13 20 if flag in args:
14 21 filtered.append(flag)
15 22 idx = args.index(flag)
16 23 if len(args) > idx+1:
17 24 if not args[idx+1].startswith('-'):
18 25 filtered.append(args[idx+1])
19 26 return filtered
20 27
21 28 def filter_args(flags, args=sys.argv[1:]):
22 29 filtered = []
23 30 for flag in flags:
24 31 if isinstance(flag, (list,tuple)):
25 32 for f in flag:
26 33 filtered.extend(_filter_arg(f, args))
27 34 else:
28 35 filtered.extend(_filter_arg(flag, args))
29 36 return filtered
30 37
31 38 def _strip_arg(flag, args):
32 39 while flag in args:
33 40 idx = args.index(flag)
34 41 args.pop(idx)
35 42 if len(args) > idx:
36 43 if not args[idx].startswith('-'):
37 44 args.pop(idx)
38 45
39 46 def strip_args(flags, args=sys.argv[1:]):
40 47 args = list(args)
41 48 for flag in flags:
42 49 if isinstance(flag, (list,tuple)):
43 50 for f in flag:
44 51 _strip_arg(f, args)
45 52 else:
46 53 _strip_arg(flag, args)
47 54 return args
48 55
49 56
50 57 def launch_process(mod, args):
51 58 """Launch a controller or engine in a subprocess."""
52 59 code = "from IPython.zmq.parallel.%s import launch_new_instance;launch_new_instance()"%mod
53 60 arguments = [ sys.executable, '-c', code ] + args
54 61 blackholew = file(os.devnull, 'w')
55 62 blackholer = file(os.devnull, 'r')
56 63
57 64 proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=PIPE)
58 65 return proc
59 66
60 67 def main():
61 68 parser = ArgumentParser(argument_default=SUPPRESS)
62 69 parser.add_argument('--n', '-n', type=int, default=1,
63 70 help="The number of engines to start.")
64 71 ns,args = parser.parse_known_args()
65 72 n = ns.n
66 73
67 74 controller = launch_process('ipcontrollerapp', args)
68 75 for i in range(10):
69 76 time.sleep(.1)
70 77 if controller.poll() is not None:
71 78 print("Controller failed to launch:")
72 79 print (controller.stderr.read())
73 80 sys.exit(255)
74 81
75 82 print("Launched Controller")
76 83 engines = [ launch_process('ipengineapp', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
77 84 print("%i Engines started"%n)
78 85
79 86 def wait_quietly(p):
80 87 try:
81 88 p.wait()
82 89 except KeyboardInterrupt:
83 90 pass
84 91
85 92 wait_quietly(controller)
86 93 map(wait_quietly, engines)
87 94 print ("Engines cleaned up.")
88 95
89 96 if __name__ == '__main__':
90 97 main() No newline at end of file
@@ -1,219 +1,225 b''
1 1 """KernelStarter class that intercepts Control Queue messages, and handles process management."""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010-2011 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
2 8
3 9 from zmq.eventloop import ioloop
4 10
5 11 from .streamsession import StreamSession
6 12
7 13 class KernelStarter(object):
8 14 """Object for resetting/killing the Kernel."""
9 15
10 16
11 17 def __init__(self, session, upstream, downstream, *kernel_args, **kernel_kwargs):
12 18 self.session = session
13 19 self.upstream = upstream
14 20 self.downstream = downstream
15 21 self.kernel_args = kernel_args
16 22 self.kernel_kwargs = kernel_kwargs
17 23 self.handlers = {}
18 24 for method in 'shutdown_request shutdown_reply'.split():
19 25 self.handlers[method] = getattr(self, method)
20 26
21 27 def start(self):
22 28 self.upstream.on_recv(self.dispatch_request)
23 29 self.downstream.on_recv(self.dispatch_reply)
24 30
25 31 #--------------------------------------------------------------------------
26 32 # Dispatch methods
27 33 #--------------------------------------------------------------------------
28 34
29 35 def dispatch_request(self, raw_msg):
30 36 idents, msg = self.session.feed_identities()
31 37 try:
32 38 msg = self.session.unpack_message(msg, content=False)
33 39 except:
34 40 print ("bad msg: %s"%msg)
35 41
36 42 msgtype = msg['msg_type']
37 43 handler = self.handlers.get(msgtype, None)
38 44 if handler is None:
39 45 self.downstream.send_multipart(raw_msg, copy=False)
40 46 else:
41 47 handler(msg)
42 48
43 49 def dispatch_reply(self, raw_msg):
44 50 idents, msg = self.session.feed_identities()
45 51 try:
46 52 msg = self.session.unpack_message(msg, content=False)
47 53 except:
48 54 print ("bad msg: %s"%msg)
49 55
50 56 msgtype = msg['msg_type']
51 57 handler = self.handlers.get(msgtype, None)
52 58 if handler is None:
53 59 self.upstream.send_multipart(raw_msg, copy=False)
54 60 else:
55 61 handler(msg)
56 62
57 63 #--------------------------------------------------------------------------
58 64 # Handlers
59 65 #--------------------------------------------------------------------------
60 66
61 67 def shutdown_request(self, msg):
62 68 """"""
63 69 self.downstream.send_multipart(msg)
64 70
65 71 #--------------------------------------------------------------------------
66 72 # Kernel process management methods, from KernelManager:
67 73 #--------------------------------------------------------------------------
68 74
69 75 def _check_local(addr):
70 76 if isinstance(addr, tuple):
71 77 addr = addr[0]
72 78 return addr in LOCAL_IPS
73 79
74 80 def start_kernel(self, **kw):
75 81 """Starts a kernel process and configures the manager to use it.
76 82
77 83 If random ports (port=0) are being used, this method must be called
78 84 before the channels are created.
79 85
80 86 Parameters:
81 87 -----------
82 88 ipython : bool, optional (default True)
83 89 Whether to use an IPython kernel instead of a plain Python kernel.
84 90 """
85 91 self.kernel = Process(target=make_kernel, args=self.kernel_args,
86 92 kwargs=self.kernel_kwargs)
87 93
88 94 def shutdown_kernel(self, restart=False):
89 95 """ Attempts to the stop the kernel process cleanly. If the kernel
90 96 cannot be stopped, it is killed, if possible.
91 97 """
92 98 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
93 99 if sys.platform == 'win32':
94 100 self.kill_kernel()
95 101 return
96 102
97 103 # Don't send any additional kernel kill messages immediately, to give
98 104 # the kernel a chance to properly execute shutdown actions. Wait for at
99 105 # most 1s, checking every 0.1s.
100 106 self.xreq_channel.shutdown(restart=restart)
101 107 for i in range(10):
102 108 if self.is_alive:
103 109 time.sleep(0.1)
104 110 else:
105 111 break
106 112 else:
107 113 # OK, we've waited long enough.
108 114 if self.has_kernel:
109 115 self.kill_kernel()
110 116
111 117 def restart_kernel(self, now=False):
112 118 """Restarts a kernel with the same arguments that were used to launch
113 119 it. If the old kernel was launched with random ports, the same ports
114 120 will be used for the new kernel.
115 121
116 122 Parameters
117 123 ----------
118 124 now : bool, optional
119 125 If True, the kernel is forcefully restarted *immediately*, without
120 126 having a chance to do any cleanup action. Otherwise the kernel is
121 127 given 1s to clean up before a forceful restart is issued.
122 128
123 129 In all cases the kernel is restarted, the only difference is whether
124 130 it is given a chance to perform a clean shutdown or not.
125 131 """
126 132 if self._launch_args is None:
127 133 raise RuntimeError("Cannot restart the kernel. "
128 134 "No previous call to 'start_kernel'.")
129 135 else:
130 136 if self.has_kernel:
131 137 if now:
132 138 self.kill_kernel()
133 139 else:
134 140 self.shutdown_kernel(restart=True)
135 141 self.start_kernel(**self._launch_args)
136 142
137 143 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
138 144 # unless there is some delay here.
139 145 if sys.platform == 'win32':
140 146 time.sleep(0.2)
141 147
142 148 @property
143 149 def has_kernel(self):
144 150 """Returns whether a kernel process has been specified for the kernel
145 151 manager.
146 152 """
147 153 return self.kernel is not None
148 154
149 155 def kill_kernel(self):
150 156 """ Kill the running kernel. """
151 157 if self.has_kernel:
152 158 # Pause the heart beat channel if it exists.
153 159 if self._hb_channel is not None:
154 160 self._hb_channel.pause()
155 161
156 162 # Attempt to kill the kernel.
157 163 try:
158 164 self.kernel.kill()
159 165 except OSError, e:
160 166 # In Windows, we will get an Access Denied error if the process
161 167 # has already terminated. Ignore it.
162 168 if not (sys.platform == 'win32' and e.winerror == 5):
163 169 raise
164 170 self.kernel = None
165 171 else:
166 172 raise RuntimeError("Cannot kill kernel. No kernel is running!")
167 173
168 174 def interrupt_kernel(self):
169 175 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
170 176 well supported on all platforms.
171 177 """
172 178 if self.has_kernel:
173 179 if sys.platform == 'win32':
174 180 from parentpoller import ParentPollerWindows as Poller
175 181 Poller.send_interrupt(self.kernel.win32_interrupt_event)
176 182 else:
177 183 self.kernel.send_signal(signal.SIGINT)
178 184 else:
179 185 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
180 186
181 187 def signal_kernel(self, signum):
182 188 """ Sends a signal to the kernel. Note that since only SIGTERM is
183 189 supported on Windows, this function is only useful on Unix systems.
184 190 """
185 191 if self.has_kernel:
186 192 self.kernel.send_signal(signum)
187 193 else:
188 194 raise RuntimeError("Cannot signal kernel. No kernel is running!")
189 195
190 196 @property
191 197 def is_alive(self):
192 198 """Is the kernel process still running?"""
193 199 # FIXME: not using a heartbeat means this method is broken for any
194 200 # remote kernel, it's only capable of handling local kernels.
195 201 if self.has_kernel:
196 202 if self.kernel.poll() is None:
197 203 return True
198 204 else:
199 205 return False
200 206 else:
201 207 # We didn't start the kernel with this KernelManager so we don't
202 208 # know if it is running. We should use a heartbeat for this case.
203 209 return True
204 210
205 211
206 212 def make_starter(up_addr, down_addr, *args, **kwargs):
207 213 """entry point function for launching a kernelstarter in a subprocess"""
208 214 loop = ioloop.IOLoop.instance()
209 215 ctx = zmq.Context()
210 216 session = StreamSession()
211 217 upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
212 218 upstream.connect(up_addr)
213 219 downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
214 220 downstream.connect(down_addr)
215 221
216 222 starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
217 223 starter.start()
218 224 loop.start()
219 225 No newline at end of file
@@ -1,95 +1,101 b''
1 1 """RemoteNamespace object, for dict style interaction with a remote
2 2 execution kernel."""
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2010-2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
3 9
4 10 from functools import wraps
5 11 from IPython.external.decorator import decorator
6 12
7 13 def _clear():
8 14 globals().clear()
9 15
10 16 @decorator
11 17 def spinfirst(f):
12 18 @wraps(f)
13 19 def spun_method(self, *args, **kwargs):
14 20 self.spin()
15 21 return f(self, *args, **kwargs)
16 22 return spun_method
17 23
18 24 @decorator
19 25 def myblock(f, self, *args, **kwargs):
20 26 block = self.client.block
21 27 self.client.block = self.block
22 28 ret = f(self, *args, **kwargs)
23 29 self.client.block = block
24 30 return ret
25 31
26 32 class RemoteNamespace(object):
27 33 """A RemoteNamespace object, providing dictionary
28 34 access to an engine via an IPython.zmq.client object.
29 35
30 36
31 37 """
32 38 client = None
33 39 queue = None
34 40 id = None
35 41 block = False
36 42
37 43 def __init__(self, client, id):
38 44 self.client = client
39 45 self.id = id
40 46 self.block = client.block # initial state is same as client
41 47
42 48 def __repr__(self):
43 49 return "<RemoteNamespace[%i]>"%self.id
44 50
45 51 @myblock
46 52 def apply(self, f, *args, **kwargs):
47 53 """call f(*args, **kwargs) in remote namespace
48 54
49 55 This method has no access to the user namespace"""
50 56 return self.client.apply_to(self.id, f, *args, **kwargs)
51 57
52 58 @myblock
53 59 def apply_bound(self, f, *args, **kwargs):
54 60 """call `f(*args, **kwargs)` in remote namespace.
55 61
56 62 `f` will have access to the user namespace as globals()."""
57 63 return self.client.apply_bound_to(self.id, f, *args, **kwargs)
58 64
59 65 @myblock
60 66 def update(self, ns):
61 67 """update remote namespace with dict `ns`"""
62 68 return self.client.push(self.id, ns, self.block)
63 69
64 70 def get(self, key_s):
65 71 """get object(s) by `key_s` from remote namespace
66 72 will return one object if it is a key.
67 73 It also takes a list of keys, and will return a list of objects."""
68 74 return self.client.pull(self.id, key_s, self.block)
69 75
70 76 push = update
71 77 pull = get
72 78
73 79 def __getitem__(self, key):
74 80 return self.get(key)
75 81
76 82 def __setitem__(self,key,value):
77 83 self.update({key:value})
78 84
79 85 def clear(self):
80 86 """clear the remote namespace"""
81 87 return self.client.apply_bound_to(self.id, _clear)
82 88
83 89 @decorator
84 90 def withme(self, toapply):
85 91 """for use as a decorator, this turns a function into
86 92 one that executes remotely."""
87 93 @wraps(toapply)
88 94 def applied(self, *args, **kwargs):
89 95 return self.apply_bound(self, toapply, *args, **kwargs)
90 96 return applied
91 97
92 98
93 99
94 100
95 101
@@ -1,584 +1,590 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
7 13
8 14 #----------------------------------------------------------------------
9 15 # Imports
10 16 #----------------------------------------------------------------------
11 17
12 18 from __future__ import print_function
13 19
14 20 import logging
15 21 import sys
16 22
17 23 from datetime import datetime, timedelta
18 24 from random import randint, random
19 25 from types import FunctionType
20 26
21 27 try:
22 28 import numpy
23 29 except ImportError:
24 30 numpy = None
25 31
26 32 import zmq
27 33 from zmq.eventloop import ioloop, zmqstream
28 34
29 35 # local imports
30 36 from IPython.external.decorator import decorator
31 37 from IPython.utils.traitlets import Instance, Dict, List, Set
32 38
33 39 from . import error
34 40 from .dependency import Dependency
35 41 from .entry_point import connect_logger, local_logger
36 42 from .factory import SessionFactory
37 43
38 44
39 45 @decorator
40 46 def logged(f,self,*args,**kwargs):
41 47 # print ("#--------------------")
42 48 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
43 49 # print ("#--")
44 50 return f(self,*args, **kwargs)
45 51
46 52 #----------------------------------------------------------------------
47 53 # Chooser functions
48 54 #----------------------------------------------------------------------
49 55
50 56 def plainrandom(loads):
51 57 """Plain random pick."""
52 58 n = len(loads)
53 59 return randint(0,n-1)
54 60
55 61 def lru(loads):
56 62 """Always pick the front of the line.
57 63
58 64 The content of `loads` is ignored.
59 65
60 66 Assumes LRU ordering of loads, with oldest first.
61 67 """
62 68 return 0
63 69
64 70 def twobin(loads):
65 71 """Pick two at random, use the LRU of the two.
66 72
67 73 The content of loads is ignored.
68 74
69 75 Assumes LRU ordering of loads, with oldest first.
70 76 """
71 77 n = len(loads)
72 78 a = randint(0,n-1)
73 79 b = randint(0,n-1)
74 80 return min(a,b)
75 81
76 82 def weighted(loads):
77 83 """Pick two at random using inverse load as weight.
78 84
79 85 Return the less loaded of the two.
80 86 """
81 87 # weight 0 a million times more than 1:
82 88 weights = 1./(1e-6+numpy.array(loads))
83 89 sums = weights.cumsum()
84 90 t = sums[-1]
85 91 x = random()*t
86 92 y = random()*t
87 93 idx = 0
88 94 idy = 0
89 95 while sums[idx] < x:
90 96 idx += 1
91 97 while sums[idy] < y:
92 98 idy += 1
93 99 if weights[idy] > weights[idx]:
94 100 return idy
95 101 else:
96 102 return idx
97 103
98 104 def leastload(loads):
99 105 """Always choose the lowest load.
100 106
101 107 If the lowest load occurs more than once, the first
102 108 occurance will be used. If loads has LRU ordering, this means
103 109 the LRU of those with the lowest load is chosen.
104 110 """
105 111 return loads.index(min(loads))
106 112
107 113 #---------------------------------------------------------------------
108 114 # Classes
109 115 #---------------------------------------------------------------------
110 116 # store empty default dependency:
111 117 MET = Dependency([])
112 118
113 119 class TaskScheduler(SessionFactory):
114 120 """Python TaskScheduler object.
115 121
116 122 This is the simplest object that supports msg_id based
117 123 DAG dependencies. *Only* task msg_ids are checked, not
118 124 msg_ids of jobs submitted via the MUX queue.
119 125
120 126 """
121 127
122 128 # input arguments:
123 129 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
124 130 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
125 131 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
126 132 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
127 133 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
128 134
129 135 # internals:
130 136 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
131 137 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
132 138 pending = Dict() # dict by engine_uuid of submitted tasks
133 139 completed = Dict() # dict by engine_uuid of completed tasks
134 140 failed = Dict() # dict by engine_uuid of failed tasks
135 141 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
136 142 clients = Dict() # dict by msg_id for who submitted the task
137 143 targets = List() # list of target IDENTs
138 144 loads = List() # list of engine loads
139 145 all_completed = Set() # set of all completed tasks
140 146 all_failed = Set() # set of all failed tasks
141 147 all_done = Set() # set of all finished tasks=union(completed,failed)
142 148 all_ids = Set() # set of all submitted task IDs
143 149 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
144 150 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
145 151
146 152
147 153 def start(self):
148 154 self.engine_stream.on_recv(self.dispatch_result, copy=False)
149 155 self._notification_handlers = dict(
150 156 registration_notification = self._register_engine,
151 157 unregistration_notification = self._unregister_engine
152 158 )
153 159 self.notifier_stream.on_recv(self.dispatch_notification)
154 160 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
155 161 self.auditor.start()
156 162 self.log.info("Scheduler started...%r"%self)
157 163
158 164 def resume_receiving(self):
159 165 """Resume accepting jobs."""
160 166 self.client_stream.on_recv(self.dispatch_submission, copy=False)
161 167
162 168 def stop_receiving(self):
163 169 """Stop accepting jobs while there are no engines.
164 170 Leave them in the ZMQ queue."""
165 171 self.client_stream.on_recv(None)
166 172
167 173 #-----------------------------------------------------------------------
168 174 # [Un]Registration Handling
169 175 #-----------------------------------------------------------------------
170 176
171 177 def dispatch_notification(self, msg):
172 178 """dispatch register/unregister events."""
173 179 idents,msg = self.session.feed_identities(msg)
174 180 msg = self.session.unpack_message(msg)
175 181 msg_type = msg['msg_type']
176 182 handler = self._notification_handlers.get(msg_type, None)
177 183 if handler is None:
178 184 raise Exception("Unhandled message type: %s"%msg_type)
179 185 else:
180 186 try:
181 187 handler(str(msg['content']['queue']))
182 188 except KeyError:
183 189 self.log.error("task::Invalid notification msg: %s"%msg)
184 190
185 191 @logged
186 192 def _register_engine(self, uid):
187 193 """New engine with ident `uid` became available."""
188 194 # head of the line:
189 195 self.targets.insert(0,uid)
190 196 self.loads.insert(0,0)
191 197 # initialize sets
192 198 self.completed[uid] = set()
193 199 self.failed[uid] = set()
194 200 self.pending[uid] = {}
195 201 if len(self.targets) == 1:
196 202 self.resume_receiving()
197 203
198 204 def _unregister_engine(self, uid):
199 205 """Existing engine with ident `uid` became unavailable."""
200 206 if len(self.targets) == 1:
201 207 # this was our only engine
202 208 self.stop_receiving()
203 209
204 210 # handle any potentially finished tasks:
205 211 self.engine_stream.flush()
206 212
207 213 self.completed.pop(uid)
208 214 self.failed.pop(uid)
209 215 # don't pop destinations, because it might be used later
210 216 # map(self.destinations.pop, self.completed.pop(uid))
211 217 # map(self.destinations.pop, self.failed.pop(uid))
212 218
213 219 idx = self.targets.index(uid)
214 220 self.targets.pop(idx)
215 221 self.loads.pop(idx)
216 222
217 223 # wait 5 seconds before cleaning up pending jobs, since the results might
218 224 # still be incoming
219 225 if self.pending[uid]:
220 226 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
221 227 dc.start()
222 228
223 229 @logged
224 230 def handle_stranded_tasks(self, engine):
225 231 """Deal with jobs resident in an engine that died."""
226 232 lost = self.pending.pop(engine)
227 233
228 234 for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems():
229 235 self.all_failed.add(msg_id)
230 236 self.all_done.add(msg_id)
231 237 idents,msg = self.session.feed_identities(raw_msg, copy=False)
232 238 msg = self.session.unpack_message(msg, copy=False, content=False)
233 239 parent = msg['header']
234 240 idents = [idents[0],engine]+idents[1:]
235 241 print (idents)
236 242 try:
237 243 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
238 244 except:
239 245 content = error.wrap_exception()
240 246 msg = self.session.send(self.client_stream, 'apply_reply', content,
241 247 parent=parent, ident=idents)
242 248 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
243 249 self.update_graph(msg_id)
244 250
245 251
246 252 #-----------------------------------------------------------------------
247 253 # Job Submission
248 254 #-----------------------------------------------------------------------
249 255 @logged
250 256 def dispatch_submission(self, raw_msg):
251 257 """Dispatch job submission to appropriate handlers."""
252 258 # ensure targets up to date:
253 259 self.notifier_stream.flush()
254 260 try:
255 261 idents, msg = self.session.feed_identities(raw_msg, copy=False)
256 262 msg = self.session.unpack_message(msg, content=False, copy=False)
257 263 except:
258 264 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
259 265 return
260 266
261 267 # send to monitor
262 268 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
263 269
264 270 header = msg['header']
265 271 msg_id = header['msg_id']
266 272 self.all_ids.add(msg_id)
267 273
268 274 # targets
269 275 targets = set(header.get('targets', []))
270 276
271 277 # time dependencies
272 278 after = Dependency(header.get('after', []))
273 279 if after.all:
274 280 after.difference_update(self.all_completed)
275 281 if not after.success_only:
276 282 after.difference_update(self.all_failed)
277 283 if after.check(self.all_completed, self.all_failed):
278 284 # recast as empty set, if `after` already met,
279 285 # to prevent unnecessary set comparisons
280 286 after = MET
281 287
282 288 # location dependencies
283 289 follow = Dependency(header.get('follow', []))
284 290
285 291 # turn timeouts into datetime objects:
286 292 timeout = header.get('timeout', None)
287 293 if timeout:
288 294 timeout = datetime.now() + timedelta(0,timeout,0)
289 295
290 296 args = [raw_msg, targets, after, follow, timeout]
291 297
292 298 # validate and reduce dependencies:
293 299 for dep in after,follow:
294 300 # check valid:
295 301 if msg_id in dep or dep.difference(self.all_ids):
296 302 self.depending[msg_id] = args
297 303 return self.fail_unreachable(msg_id, error.InvalidDependency)
298 304 # check if unreachable:
299 305 if dep.unreachable(self.all_failed):
300 306 self.depending[msg_id] = args
301 307 return self.fail_unreachable(msg_id)
302 308
303 309 if after.check(self.all_completed, self.all_failed):
304 310 # time deps already met, try to run
305 311 if not self.maybe_run(msg_id, *args):
306 312 # can't run yet
307 313 self.save_unmet(msg_id, *args)
308 314 else:
309 315 self.save_unmet(msg_id, *args)
310 316
311 317 # @logged
312 318 def audit_timeouts(self):
313 319 """Audit all waiting tasks for expired timeouts."""
314 320 now = datetime.now()
315 321 for msg_id in self.depending.keys():
316 322 # must recheck, in case one failure cascaded to another:
317 323 if msg_id in self.depending:
318 324 raw,after,targets,follow,timeout = self.depending[msg_id]
319 325 if timeout and timeout < now:
320 326 self.fail_unreachable(msg_id, timeout=True)
321 327
322 328 @logged
323 329 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
324 330 """a task has become unreachable, send a reply with an ImpossibleDependency
325 331 error."""
326 332 if msg_id not in self.depending:
327 333 self.log.error("msg %r already failed!"%msg_id)
328 334 return
329 335 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
330 336 for mid in follow.union(after):
331 337 if mid in self.graph:
332 338 self.graph[mid].remove(msg_id)
333 339
334 340 # FIXME: unpacking a message I've already unpacked, but didn't save:
335 341 idents,msg = self.session.feed_identities(raw_msg, copy=False)
336 342 msg = self.session.unpack_message(msg, copy=False, content=False)
337 343 header = msg['header']
338 344
339 345 try:
340 346 raise why()
341 347 except:
342 348 content = error.wrap_exception()
343 349
344 350 self.all_done.add(msg_id)
345 351 self.all_failed.add(msg_id)
346 352
347 353 msg = self.session.send(self.client_stream, 'apply_reply', content,
348 354 parent=header, ident=idents)
349 355 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
350 356
351 357 self.update_graph(msg_id, success=False)
352 358
353 359 @logged
354 360 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
355 361 """check location dependencies, and run if they are met."""
356 362 blacklist = self.blacklist.setdefault(msg_id, set())
357 363 if follow or targets or blacklist:
358 364 # we need a can_run filter
359 365 def can_run(idx):
360 366 target = self.targets[idx]
361 367 # check targets
362 368 if targets and target not in targets:
363 369 return False
364 370 # check blacklist
365 371 if target in blacklist:
366 372 return False
367 373 # check follow
368 374 return follow.check(self.completed[target], self.failed[target])
369 375
370 376 indices = filter(can_run, range(len(self.targets)))
371 377 if not indices:
372 378 # couldn't run
373 379 if follow.all:
374 380 # check follow for impossibility
375 381 dests = set()
376 382 relevant = self.all_completed if follow.success_only else self.all_done
377 383 for m in follow.intersection(relevant):
378 384 dests.add(self.destinations[m])
379 385 if len(dests) > 1:
380 386 self.fail_unreachable(msg_id)
381 387 return False
382 388 if targets:
383 389 # check blacklist+targets for impossibility
384 390 targets.difference_update(blacklist)
385 391 if not targets or not targets.intersection(self.targets):
386 392 self.fail_unreachable(msg_id)
387 393 return False
388 394 return False
389 395 else:
390 396 indices = None
391 397
392 398 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
393 399 return True
394 400
395 401 @logged
396 402 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
397 403 """Save a message for later submission when its dependencies are met."""
398 404 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
399 405 # track the ids in follow or after, but not those already finished
400 406 for dep_id in after.union(follow).difference(self.all_done):
401 407 if dep_id not in self.graph:
402 408 self.graph[dep_id] = set()
403 409 self.graph[dep_id].add(msg_id)
404 410
405 411 @logged
406 412 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
407 413 """Submit a task to any of a subset of our targets."""
408 414 if indices:
409 415 loads = [self.loads[i] for i in indices]
410 416 else:
411 417 loads = self.loads
412 418 idx = self.scheme(loads)
413 419 if indices:
414 420 idx = indices[idx]
415 421 target = self.targets[idx]
416 422 # print (target, map(str, msg[:3]))
417 423 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
418 424 self.engine_stream.send_multipart(raw_msg, copy=False)
419 425 self.add_job(idx)
420 426 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
421 427 content = dict(msg_id=msg_id, engine_id=target)
422 428 self.session.send(self.mon_stream, 'task_destination', content=content,
423 429 ident=['tracktask',self.session.session])
424 430
425 431 #-----------------------------------------------------------------------
426 432 # Result Handling
427 433 #-----------------------------------------------------------------------
428 434 @logged
429 435 def dispatch_result(self, raw_msg):
430 436 """dispatch method for result replies"""
431 437 try:
432 438 idents,msg = self.session.feed_identities(raw_msg, copy=False)
433 439 msg = self.session.unpack_message(msg, content=False, copy=False)
434 440 except:
435 441 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
436 442 return
437 443
438 444 header = msg['header']
439 445 if header.get('dependencies_met', True):
440 446 success = (header['status'] == 'ok')
441 447 self.handle_result(idents, msg['parent_header'], raw_msg, success)
442 448 # send to Hub monitor
443 449 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
444 450 else:
445 451 self.handle_unmet_dependency(idents, msg['parent_header'])
446 452
447 453 @logged
448 454 def handle_result(self, idents, parent, raw_msg, success=True):
449 455 """handle a real task result, either success or failure"""
450 456 # first, relay result to client
451 457 engine = idents[0]
452 458 client = idents[1]
453 459 # swap_ids for XREP-XREP mirror
454 460 raw_msg[:2] = [client,engine]
455 461 # print (map(str, raw_msg[:4]))
456 462 self.client_stream.send_multipart(raw_msg, copy=False)
457 463 # now, update our data structures
458 464 msg_id = parent['msg_id']
459 465 self.blacklist.pop(msg_id, None)
460 466 self.pending[engine].pop(msg_id)
461 467 if success:
462 468 self.completed[engine].add(msg_id)
463 469 self.all_completed.add(msg_id)
464 470 else:
465 471 self.failed[engine].add(msg_id)
466 472 self.all_failed.add(msg_id)
467 473 self.all_done.add(msg_id)
468 474 self.destinations[msg_id] = engine
469 475
470 476 self.update_graph(msg_id, success)
471 477
472 478 @logged
473 479 def handle_unmet_dependency(self, idents, parent):
474 480 """handle an unmet dependency"""
475 481 engine = idents[0]
476 482 msg_id = parent['msg_id']
477 483
478 484 if msg_id not in self.blacklist:
479 485 self.blacklist[msg_id] = set()
480 486 self.blacklist[msg_id].add(engine)
481 487
482 488 args = self.pending[engine].pop(msg_id)
483 489 raw,targets,after,follow,timeout = args
484 490
485 491 if self.blacklist[msg_id] == targets:
486 492 self.depending[msg_id] = args
487 493 return self.fail_unreachable(msg_id)
488 494
489 495 elif not self.maybe_run(msg_id, *args):
490 496 # resubmit failed, put it back in our dependency tree
491 497 self.save_unmet(msg_id, *args)
492 498
493 499
494 500 @logged
495 501 def update_graph(self, dep_id, success=True):
496 502 """dep_id just finished. Update our dependency
497 503 graph and submit any jobs that just became runable."""
498 504 # print ("\n\n***********")
499 505 # pprint (dep_id)
500 506 # pprint (self.graph)
501 507 # pprint (self.depending)
502 508 # pprint (self.all_completed)
503 509 # pprint (self.all_failed)
504 510 # print ("\n\n***********\n\n")
505 511 if dep_id not in self.graph:
506 512 return
507 513 jobs = self.graph.pop(dep_id)
508 514
509 515 for msg_id in jobs:
510 516 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
511 517 # if dep_id in after:
512 518 # if after.all and (success or not after.success_only):
513 519 # after.remove(dep_id)
514 520
515 521 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
516 522 self.fail_unreachable(msg_id)
517 523
518 524 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
519 525 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
520 526
521 527 self.depending.pop(msg_id)
522 528 for mid in follow.union(after):
523 529 if mid in self.graph:
524 530 self.graph[mid].remove(msg_id)
525 531
526 532 #----------------------------------------------------------------------
527 533 # methods to be overridden by subclasses
528 534 #----------------------------------------------------------------------
529 535
530 536 def add_job(self, idx):
531 537 """Called after self.targets[idx] just got the job with header.
532 538 Override with subclasses. The default ordering is simple LRU.
533 539 The default loads are the number of outstanding jobs."""
534 540 self.loads[idx] += 1
535 541 for lis in (self.targets, self.loads):
536 542 lis.append(lis.pop(idx))
537 543
538 544
539 545 def finish_job(self, idx):
540 546 """Called after self.targets[idx] just finished a job.
541 547 Override with subclasses."""
542 548 self.loads[idx] -= 1
543 549
544 550
545 551
546 552 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
547 553 log_addr=None, loglevel=logging.DEBUG, scheme='lru',
548 554 identity=b'task'):
549 555 from zmq.eventloop import ioloop
550 556 from zmq.eventloop.zmqstream import ZMQStream
551 557
552 558 ctx = zmq.Context()
553 559 loop = ioloop.IOLoop()
554 560 print (in_addr, out_addr, mon_addr, not_addr)
555 561 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
556 562 ins.setsockopt(zmq.IDENTITY, identity)
557 563 ins.bind(in_addr)
558 564
559 565 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
560 566 outs.setsockopt(zmq.IDENTITY, identity)
561 567 outs.bind(out_addr)
562 568 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
563 569 mons.connect(mon_addr)
564 570 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
565 571 nots.setsockopt(zmq.SUBSCRIBE, '')
566 572 nots.connect(not_addr)
567 573
568 574 scheme = globals().get(scheme, None)
569 575 # setup logging
570 576 if log_addr:
571 577 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
572 578 else:
573 579 local_logger(logname, loglevel)
574 580
575 581 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
576 582 mon_stream=mons, notifier_stream=nots,
577 583 scheme=scheme, loop=loop, logname=logname,
578 584 config=config)
579 585 scheduler.start()
580 586 try:
581 587 loop.start()
582 588 except KeyboardInterrupt:
583 589 print ("interrupted, exiting...", file=sys.__stderr__)
584 590
@@ -1,487 +1,493 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2011 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
5 11
6 12 #-----------------------------------------------------------------------------
7 13 # Imports
8 14 #-----------------------------------------------------------------------------
9 15
10 16 # Standard library imports.
11 17 from __future__ import print_function
12 18
13 19 import sys
14 20 import time
15 21
16 22 from code import CommandCompiler
17 23 from datetime import datetime
18 24 from pprint import pprint
19 25 from signal import SIGTERM, SIGKILL
20 26
21 27 # System library imports.
22 28 import zmq
23 29 from zmq.eventloop import ioloop, zmqstream
24 30
25 31 # Local imports.
26 32 from IPython.core import ultratb
27 33 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
28 34 from IPython.zmq.completer import KernelCompleter
29 35 from IPython.zmq.iostream import OutStream
30 36 from IPython.zmq.displayhook import DisplayHook
31 37
32 38 from . import heartmonitor
33 39 from .client import Client
34 40 from .error import wrap_exception
35 41 from .factory import SessionFactory
36 42 from .streamsession import StreamSession
37 43 from .util import serialize_object, unpack_apply_message, ISO8601, Namespace
38 44
39 45 def printer(*args):
40 46 pprint(args, stream=sys.__stdout__)
41 47
42 48
43 49 class _Passer:
44 50 """Empty class that implements `send()` that does nothing."""
45 51 def send(self, *args, **kwargs):
46 52 pass
47 53 send_multipart = send
48 54
49 55
50 56 #-----------------------------------------------------------------------------
51 57 # Main kernel class
52 58 #-----------------------------------------------------------------------------
53 59
54 60 class Kernel(SessionFactory):
55 61
56 62 #---------------------------------------------------------------------------
57 63 # Kernel interface
58 64 #---------------------------------------------------------------------------
59 65
60 66 # kwargs:
61 67 int_id = Int(-1, config=True)
62 68 user_ns = Dict(config=True)
63 69 exec_lines = List(config=True)
64 70
65 71 control_stream = Instance(zmqstream.ZMQStream)
66 72 task_stream = Instance(zmqstream.ZMQStream)
67 73 iopub_stream = Instance(zmqstream.ZMQStream)
68 74 client = Instance('IPython.zmq.parallel.client.Client')
69 75
70 76 # internals
71 77 shell_streams = List()
72 78 compiler = Instance(CommandCompiler, (), {})
73 79 completer = Instance(KernelCompleter)
74 80
75 81 aborted = Set()
76 82 shell_handlers = Dict()
77 83 control_handlers = Dict()
78 84
79 85 def _set_prefix(self):
80 86 self.prefix = "engine.%s"%self.int_id
81 87
82 88 def _connect_completer(self):
83 89 self.completer = KernelCompleter(self.user_ns)
84 90
85 91 def __init__(self, **kwargs):
86 92 super(Kernel, self).__init__(**kwargs)
87 93 self._set_prefix()
88 94 self._connect_completer()
89 95
90 96 self.on_trait_change(self._set_prefix, 'id')
91 97 self.on_trait_change(self._connect_completer, 'user_ns')
92 98
93 99 # Build dict of handlers for message types
94 100 for msg_type in ['execute_request', 'complete_request', 'apply_request',
95 101 'clear_request']:
96 102 self.shell_handlers[msg_type] = getattr(self, msg_type)
97 103
98 104 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
99 105 self.control_handlers[msg_type] = getattr(self, msg_type)
100 106
101 107 self._initial_exec_lines()
102 108
103 109 def _wrap_exception(self, method=None):
104 110 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
105 111 content=wrap_exception(e_info)
106 112 return content
107 113
108 114 def _initial_exec_lines(self):
109 115 s = _Passer()
110 116 content = dict(silent=True, user_variable=[],user_expressions=[])
111 117 for line in self.exec_lines:
112 118 self.log.debug("executing initialization: %s"%line)
113 119 content.update({'code':line})
114 120 msg = self.session.msg('execute_request', content)
115 121 self.execute_request(s, [], msg)
116 122
117 123
118 124 #-------------------- control handlers -----------------------------
119 125 def abort_queues(self):
120 126 for stream in self.shell_streams:
121 127 if stream:
122 128 self.abort_queue(stream)
123 129
124 130 def abort_queue(self, stream):
125 131 while True:
126 132 try:
127 133 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
128 134 except zmq.ZMQError as e:
129 135 if e.errno == zmq.EAGAIN:
130 136 break
131 137 else:
132 138 return
133 139 else:
134 140 if msg is None:
135 141 return
136 142 else:
137 143 idents,msg = msg
138 144
139 145 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
140 146 # msg = self.reply_socket.recv_json()
141 147 self.log.info("Aborting:")
142 148 self.log.info(str(msg))
143 149 msg_type = msg['msg_type']
144 150 reply_type = msg_type.split('_')[0] + '_reply'
145 151 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
146 152 # self.reply_socket.send(ident,zmq.SNDMORE)
147 153 # self.reply_socket.send_json(reply_msg)
148 154 reply_msg = self.session.send(stream, reply_type,
149 155 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
150 156 self.log.debug(str(reply_msg))
151 157 # We need to wait a bit for requests to come in. This can probably
152 158 # be set shorter for true asynchronous clients.
153 159 time.sleep(0.05)
154 160
155 161 def abort_request(self, stream, ident, parent):
156 162 """abort a specifig msg by id"""
157 163 msg_ids = parent['content'].get('msg_ids', None)
158 164 if isinstance(msg_ids, basestring):
159 165 msg_ids = [msg_ids]
160 166 if not msg_ids:
161 167 self.abort_queues()
162 168 for mid in msg_ids:
163 169 self.aborted.add(str(mid))
164 170
165 171 content = dict(status='ok')
166 172 reply_msg = self.session.send(stream, 'abort_reply', content=content,
167 173 parent=parent, ident=ident)[0]
168 174 self.log.debug(str(reply_msg))
169 175
170 176 def shutdown_request(self, stream, ident, parent):
171 177 """kill ourself. This should really be handled in an external process"""
172 178 try:
173 179 self.abort_queues()
174 180 except:
175 181 content = self._wrap_exception('shutdown')
176 182 else:
177 183 content = dict(parent['content'])
178 184 content['status'] = 'ok'
179 185 msg = self.session.send(stream, 'shutdown_reply',
180 186 content=content, parent=parent, ident=ident)
181 187 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
182 188 # content, parent, ident)
183 189 # print >> sys.__stdout__, msg
184 190 # time.sleep(0.2)
185 191 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
186 192 dc.start()
187 193
188 194 def dispatch_control(self, msg):
189 195 idents,msg = self.session.feed_identities(msg, copy=False)
190 196 try:
191 197 msg = self.session.unpack_message(msg, content=True, copy=False)
192 198 except:
193 199 self.log.error("Invalid Message", exc_info=True)
194 200 return
195 201
196 202 header = msg['header']
197 203 msg_id = header['msg_id']
198 204
199 205 handler = self.control_handlers.get(msg['msg_type'], None)
200 206 if handler is None:
201 207 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
202 208 else:
203 209 handler(self.control_stream, idents, msg)
204 210
205 211
206 212 #-------------------- queue helpers ------------------------------
207 213
208 214 def check_dependencies(self, dependencies):
209 215 if not dependencies:
210 216 return True
211 217 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
212 218 anyorall = dependencies[0]
213 219 dependencies = dependencies[1]
214 220 else:
215 221 anyorall = 'all'
216 222 results = self.client.get_results(dependencies,status_only=True)
217 223 if results['status'] != 'ok':
218 224 return False
219 225
220 226 if anyorall == 'any':
221 227 if not results['completed']:
222 228 return False
223 229 else:
224 230 if results['pending']:
225 231 return False
226 232
227 233 return True
228 234
229 235 def check_aborted(self, msg_id):
230 236 return msg_id in self.aborted
231 237
232 238 #-------------------- queue handlers -----------------------------
233 239
234 240 def clear_request(self, stream, idents, parent):
235 241 """Clear our namespace."""
236 242 self.user_ns = {}
237 243 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
238 244 content = dict(status='ok'))
239 245 self._initial_exec_lines()
240 246
241 247 def execute_request(self, stream, ident, parent):
242 248 self.log.debug('execute request %s'%parent)
243 249 try:
244 250 code = parent[u'content'][u'code']
245 251 except:
246 252 self.log.error("Got bad msg: %s"%parent, exc_info=True)
247 253 return
248 254 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
249 255 ident='%s.pyin'%self.prefix)
250 256 started = datetime.now().strftime(ISO8601)
251 257 try:
252 258 comp_code = self.compiler(code, '<zmq-kernel>')
253 259 # allow for not overriding displayhook
254 260 if hasattr(sys.displayhook, 'set_parent'):
255 261 sys.displayhook.set_parent(parent)
256 262 sys.stdout.set_parent(parent)
257 263 sys.stderr.set_parent(parent)
258 264 exec comp_code in self.user_ns, self.user_ns
259 265 except:
260 266 exc_content = self._wrap_exception('execute')
261 267 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
262 268 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
263 269 ident='%s.pyerr'%self.prefix)
264 270 reply_content = exc_content
265 271 else:
266 272 reply_content = {'status' : 'ok'}
267 273
268 274 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
269 275 ident=ident, subheader = dict(started=started))
270 276 self.log.debug(str(reply_msg))
271 277 if reply_msg['content']['status'] == u'error':
272 278 self.abort_queues()
273 279
274 280 def complete_request(self, stream, ident, parent):
275 281 matches = {'matches' : self.complete(parent),
276 282 'status' : 'ok'}
277 283 completion_msg = self.session.send(stream, 'complete_reply',
278 284 matches, parent, ident)
279 285 # print >> sys.__stdout__, completion_msg
280 286
281 287 def complete(self, msg):
282 288 return self.completer.complete(msg.content.line, msg.content.text)
283 289
284 290 def apply_request(self, stream, ident, parent):
285 291 # flush previous reply, so this request won't block it
286 292 stream.flush(zmq.POLLOUT)
287 293
288 294 try:
289 295 content = parent[u'content']
290 296 bufs = parent[u'buffers']
291 297 msg_id = parent['header']['msg_id']
292 298 bound = content.get('bound', False)
293 299 except:
294 300 self.log.error("Got bad msg: %s"%parent, exc_info=True)
295 301 return
296 302 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
297 303 # self.iopub_stream.send(pyin_msg)
298 304 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
299 305 sub = {'dependencies_met' : True, 'engine' : self.ident,
300 306 'started': datetime.now().strftime(ISO8601)}
301 307 try:
302 308 # allow for not overriding displayhook
303 309 if hasattr(sys.displayhook, 'set_parent'):
304 310 sys.displayhook.set_parent(parent)
305 311 sys.stdout.set_parent(parent)
306 312 sys.stderr.set_parent(parent)
307 313 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
308 314 working = self.user_ns
309 315 # suffix =
310 316 prefix = "_"+str(msg_id).replace("-","")+"_"
311 317 # if bound:
312 318 #
313 319 # else:
314 320 # working = dict()
315 321 # suffix = prefix = "_" # prevent keyword collisions with lambda
316 322 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
317 323 if bound:
318 324 bound_ns = Namespace(working)
319 325 args = [bound_ns]+list(args)
320 326 # if f.fun
321 327 fname = getattr(f, '__name__', 'f')
322 328
323 329 fname = prefix+"f"
324 330 argname = prefix+"args"
325 331 kwargname = prefix+"kwargs"
326 332 resultname = prefix+"result"
327 333
328 334 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
329 335 # print ns
330 336 working.update(ns)
331 337 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
332 338 try:
333 339 exec code in working,working
334 340 result = working.get(resultname)
335 341 finally:
336 342 for key in ns.iterkeys():
337 343 working.pop(key)
338 344 if bound:
339 345 working.update(bound_ns)
340 346
341 347 packed_result,buf = serialize_object(result)
342 348 result_buf = [packed_result]+buf
343 349 except:
344 350 exc_content = self._wrap_exception('apply')
345 351 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
346 352 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
347 353 ident='%s.pyerr'%self.prefix)
348 354 reply_content = exc_content
349 355 result_buf = []
350 356
351 357 if exc_content['ename'] == 'UnmetDependency':
352 358 sub['dependencies_met'] = False
353 359 else:
354 360 reply_content = {'status' : 'ok'}
355 361
356 362 # put 'ok'/'error' status in header, for scheduler introspection:
357 363 sub['status'] = reply_content['status']
358 364
359 365 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
360 366 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
361 367
362 368 # if reply_msg['content']['status'] == u'error':
363 369 # self.abort_queues()
364 370
365 371 def dispatch_queue(self, stream, msg):
366 372 self.control_stream.flush()
367 373 idents,msg = self.session.feed_identities(msg, copy=False)
368 374 try:
369 375 msg = self.session.unpack_message(msg, content=True, copy=False)
370 376 except:
371 377 self.log.error("Invalid Message", exc_info=True)
372 378 return
373 379
374 380
375 381 header = msg['header']
376 382 msg_id = header['msg_id']
377 383 if self.check_aborted(msg_id):
378 384 self.aborted.remove(msg_id)
379 385 # is it safe to assume a msg_id will not be resubmitted?
380 386 reply_type = msg['msg_type'].split('_')[0] + '_reply'
381 387 reply_msg = self.session.send(stream, reply_type,
382 388 content={'status' : 'aborted'}, parent=msg, ident=idents)
383 389 return
384 390 handler = self.shell_handlers.get(msg['msg_type'], None)
385 391 if handler is None:
386 392 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
387 393 else:
388 394 handler(stream, idents, msg)
389 395
390 396 def start(self):
391 397 #### stream mode:
392 398 if self.control_stream:
393 399 self.control_stream.on_recv(self.dispatch_control, copy=False)
394 400 self.control_stream.on_err(printer)
395 401
396 402 def make_dispatcher(stream):
397 403 def dispatcher(msg):
398 404 return self.dispatch_queue(stream, msg)
399 405 return dispatcher
400 406
401 407 for s in self.shell_streams:
402 408 s.on_recv(make_dispatcher(s), copy=False)
403 409 s.on_err(printer)
404 410
405 411 if self.iopub_stream:
406 412 self.iopub_stream.on_err(printer)
407 413
408 414 #### while True mode:
409 415 # while True:
410 416 # idle = True
411 417 # try:
412 418 # msg = self.shell_stream.socket.recv_multipart(
413 419 # zmq.NOBLOCK, copy=False)
414 420 # except zmq.ZMQError, e:
415 421 # if e.errno != zmq.EAGAIN:
416 422 # raise e
417 423 # else:
418 424 # idle=False
419 425 # self.dispatch_queue(self.shell_stream, msg)
420 426 #
421 427 # if not self.task_stream.empty():
422 428 # idle=False
423 429 # msg = self.task_stream.recv_multipart()
424 430 # self.dispatch_queue(self.task_stream, msg)
425 431 # if idle:
426 432 # # don't busywait
427 433 # time.sleep(1e-3)
428 434
429 435 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
430 436 client_addr=None, loop=None, context=None, key=None,
431 437 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
432 438 """NO LONGER IN USE"""
433 439 # create loop, context, and session:
434 440 if loop is None:
435 441 loop = ioloop.IOLoop.instance()
436 442 if context is None:
437 443 context = zmq.Context()
438 444 c = context
439 445 session = StreamSession(key=key)
440 446 # print (session.key)
441 447 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
442 448
443 449 # create Control Stream
444 450 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
445 451 control_stream.setsockopt(zmq.IDENTITY, identity)
446 452 control_stream.connect(control_addr)
447 453
448 454 # create Shell Streams (MUX, Task, etc.):
449 455 shell_streams = []
450 456 for addr in shell_addrs:
451 457 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
452 458 stream.setsockopt(zmq.IDENTITY, identity)
453 459 stream.connect(addr)
454 460 shell_streams.append(stream)
455 461
456 462 # create iopub stream:
457 463 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
458 464 iopub_stream.setsockopt(zmq.IDENTITY, identity)
459 465 iopub_stream.connect(iopub_addr)
460 466
461 467 # Redirect input streams and set a display hook.
462 468 if out_stream_factory:
463 469 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
464 470 sys.stdout.topic = 'engine.%i.stdout'%int_id
465 471 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
466 472 sys.stderr.topic = 'engine.%i.stderr'%int_id
467 473 if display_hook_factory:
468 474 sys.displayhook = display_hook_factory(session, iopub_stream)
469 475 sys.displayhook.topic = 'engine.%i.pyout'%int_id
470 476
471 477
472 478 # launch heartbeat
473 479 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
474 480 heart.start()
475 481
476 482 # create (optional) Client
477 483 if client_addr:
478 484 client = Client(client_addr, username=identity)
479 485 else:
480 486 client = None
481 487
482 488 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
483 489 shell_streams=shell_streams, iopub_stream=iopub_stream,
484 490 client=client, loop=loop)
485 491 kernel.start()
486 492 return loop, c, kernel
487 493
@@ -1,412 +1,418 b''
1 1 #!/usr/bin/env python
2 2 """edited session.py to work with streams, and move msg_type to the header
3 3 """
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2010-2011 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
4 10
5 11
6 12 import os
7 13 import pprint
8 14 import uuid
9 15 from datetime import datetime
10 16
11 17 try:
12 18 import cPickle
13 19 pickle = cPickle
14 20 except:
15 21 cPickle = None
16 22 import pickle
17 23
18 24 import zmq
19 25 from zmq.utils import jsonapi
20 26 from zmq.eventloop.zmqstream import ZMQStream
21 27
22 28 from .util import ISO8601
23 29
24 30 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
25 31 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
26 32 if json_name in ('jsonlib', 'jsonlib2'):
27 33 use_json = True
28 34 elif json_name:
29 35 if cPickle is None:
30 36 use_json = True
31 37 else:
32 38 use_json = False
33 39 else:
34 40 use_json = False
35 41
36 42 def squash_unicode(obj):
37 43 if isinstance(obj,dict):
38 44 for key in obj.keys():
39 45 obj[key] = squash_unicode(obj[key])
40 46 if isinstance(key, unicode):
41 47 obj[squash_unicode(key)] = obj.pop(key)
42 48 elif isinstance(obj, list):
43 49 for i,v in enumerate(obj):
44 50 obj[i] = squash_unicode(v)
45 51 elif isinstance(obj, unicode):
46 52 obj = obj.encode('utf8')
47 53 return obj
48 54
49 55 json_packer = jsonapi.dumps
50 56 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
51 57
52 58 pickle_packer = lambda o: pickle.dumps(o,-1)
53 59 pickle_unpacker = pickle.loads
54 60
55 61 if use_json:
56 62 default_packer = json_packer
57 63 default_unpacker = json_unpacker
58 64 else:
59 65 default_packer = pickle_packer
60 66 default_unpacker = pickle_unpacker
61 67
62 68
63 69 DELIM="<IDS|MSG>"
64 70
65 71 class Message(object):
66 72 """A simple message object that maps dict keys to attributes.
67 73
68 74 A Message can be created from a dict and a dict from a Message instance
69 75 simply by calling dict(msg_obj)."""
70 76
71 77 def __init__(self, msg_dict):
72 78 dct = self.__dict__
73 79 for k, v in dict(msg_dict).iteritems():
74 80 if isinstance(v, dict):
75 81 v = Message(v)
76 82 dct[k] = v
77 83
78 84 # Having this iterator lets dict(msg_obj) work out of the box.
79 85 def __iter__(self):
80 86 return iter(self.__dict__.iteritems())
81 87
82 88 def __repr__(self):
83 89 return repr(self.__dict__)
84 90
85 91 def __str__(self):
86 92 return pprint.pformat(self.__dict__)
87 93
88 94 def __contains__(self, k):
89 95 return k in self.__dict__
90 96
91 97 def __getitem__(self, k):
92 98 return self.__dict__[k]
93 99
94 100
95 101 def msg_header(msg_id, msg_type, username, session):
96 102 date=datetime.now().strftime(ISO8601)
97 103 return locals()
98 104
99 105 def extract_header(msg_or_header):
100 106 """Given a message or header, return the header."""
101 107 if not msg_or_header:
102 108 return {}
103 109 try:
104 110 # See if msg_or_header is the entire message.
105 111 h = msg_or_header['header']
106 112 except KeyError:
107 113 try:
108 114 # See if msg_or_header is just the header
109 115 h = msg_or_header['msg_id']
110 116 except KeyError:
111 117 raise
112 118 else:
113 119 h = msg_or_header
114 120 if not isinstance(h, dict):
115 121 h = dict(h)
116 122 return h
117 123
118 124 class StreamSession(object):
119 125 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
120 126 debug=False
121 127 key=None
122 128
123 129 def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None):
124 130 if username is None:
125 131 username = os.environ.get('USER','username')
126 132 self.username = username
127 133 if session is None:
128 134 self.session = str(uuid.uuid4())
129 135 else:
130 136 self.session = session
131 137 self.msg_id = str(uuid.uuid4())
132 138 if packer is None:
133 139 self.pack = default_packer
134 140 else:
135 141 if not callable(packer):
136 142 raise TypeError("packer must be callable, not %s"%type(packer))
137 143 self.pack = packer
138 144
139 145 if unpacker is None:
140 146 self.unpack = default_unpacker
141 147 else:
142 148 if not callable(unpacker):
143 149 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
144 150 self.unpack = unpacker
145 151
146 152 if key is not None and keyfile is not None:
147 153 raise TypeError("Must specify key OR keyfile, not both")
148 154 if keyfile is not None:
149 155 with open(keyfile) as f:
150 156 self.key = f.read().strip()
151 157 else:
152 158 self.key = key
153 159 if isinstance(self.key, unicode):
154 160 self.key = self.key.encode('utf8')
155 161 # print key, keyfile, self.key
156 162 self.none = self.pack({})
157 163
158 164 def msg_header(self, msg_type):
159 165 h = msg_header(self.msg_id, msg_type, self.username, self.session)
160 166 self.msg_id = str(uuid.uuid4())
161 167 return h
162 168
163 169 def msg(self, msg_type, content=None, parent=None, subheader=None):
164 170 msg = {}
165 171 msg['header'] = self.msg_header(msg_type)
166 172 msg['msg_id'] = msg['header']['msg_id']
167 173 msg['parent_header'] = {} if parent is None else extract_header(parent)
168 174 msg['msg_type'] = msg_type
169 175 msg['content'] = {} if content is None else content
170 176 sub = {} if subheader is None else subheader
171 177 msg['header'].update(sub)
172 178 return msg
173 179
174 180 def check_key(self, msg_or_header):
175 181 """Check that a message's header has the right key"""
176 182 if self.key is None:
177 183 return True
178 184 header = extract_header(msg_or_header)
179 185 return header.get('key', None) == self.key
180 186
181 187
182 188 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False):
183 189 """Build and send a message via stream or socket.
184 190
185 191 Parameters
186 192 ----------
187 193
188 194 stream : zmq.Socket or ZMQStream
189 195 the socket-like object used to send the data
190 196 msg_or_type : str or Message/dict
191 197 Normally, msg_or_type will be a msg_type unless a message is being sent more
192 198 than once.
193 199
194 200 content : dict or None
195 201 the content of the message (ignored if msg_or_type is a message)
196 202 buffers : list or None
197 203 the already-serialized buffers to be appended to the message
198 204 parent : Message or dict or None
199 205 the parent or parent header describing the parent of this message
200 206 subheader : dict or None
201 207 extra header keys for this message's header
202 208 ident : bytes or list of bytes
203 209 the zmq.IDENTITY routing path
204 210 track : bool
205 211 whether to track. Only for use with Sockets, because ZMQStream objects cannot track messages.
206 212
207 213 Returns
208 214 -------
209 215 msg : message dict
210 216 the constructed message
211 217 (msg,tracker) : (message dict, MessageTracker)
212 218 if track=True, then a 2-tuple will be returned, the first element being the constructed
213 219 message, and the second being the MessageTracker
214 220
215 221 """
216 222
217 223 if not isinstance(stream, (zmq.Socket, ZMQStream)):
218 224 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
219 225 elif track and isinstance(stream, ZMQStream):
220 226 raise TypeError("ZMQStream cannot track messages")
221 227
222 228 if isinstance(msg_or_type, (Message, dict)):
223 229 # we got a Message, not a msg_type
224 230 # don't build a new Message
225 231 msg = msg_or_type
226 232 content = msg['content']
227 233 else:
228 234 msg = self.msg(msg_or_type, content, parent, subheader)
229 235
230 236 buffers = [] if buffers is None else buffers
231 237 to_send = []
232 238 if isinstance(ident, list):
233 239 # accept list of idents
234 240 to_send.extend(ident)
235 241 elif ident is not None:
236 242 to_send.append(ident)
237 243 to_send.append(DELIM)
238 244 if self.key is not None:
239 245 to_send.append(self.key)
240 246 to_send.append(self.pack(msg['header']))
241 247 to_send.append(self.pack(msg['parent_header']))
242 248
243 249 if content is None:
244 250 content = self.none
245 251 elif isinstance(content, dict):
246 252 content = self.pack(content)
247 253 elif isinstance(content, bytes):
248 254 # content is already packed, as in a relayed message
249 255 pass
250 256 else:
251 257 raise TypeError("Content incorrect type: %s"%type(content))
252 258 to_send.append(content)
253 259 flag = 0
254 260 if buffers:
255 261 flag = zmq.SNDMORE
256 262 _track = False
257 263 else:
258 264 _track=track
259 265 if track:
260 266 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
261 267 else:
262 268 tracker = stream.send_multipart(to_send, flag, copy=False)
263 269 for b in buffers[:-1]:
264 270 stream.send(b, flag, copy=False)
265 271 if buffers:
266 272 if track:
267 273 tracker = stream.send(buffers[-1], copy=False, track=track)
268 274 else:
269 275 tracker = stream.send(buffers[-1], copy=False)
270 276
271 277 # omsg = Message(msg)
272 278 if self.debug:
273 279 pprint.pprint(msg)
274 280 pprint.pprint(to_send)
275 281 pprint.pprint(buffers)
276 282
277 283 msg['tracker'] = tracker
278 284
279 285 return msg
280 286
281 287 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
282 288 """Send a raw message via ident path.
283 289
284 290 Parameters
285 291 ----------
286 292 msg : list of sendable buffers"""
287 293 to_send = []
288 294 if isinstance(ident, bytes):
289 295 ident = [ident]
290 296 if ident is not None:
291 297 to_send.extend(ident)
292 298 to_send.append(DELIM)
293 299 if self.key is not None:
294 300 to_send.append(self.key)
295 301 to_send.extend(msg)
296 302 stream.send_multipart(msg, flags, copy=copy)
297 303
298 304 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
299 305 """receives and unpacks a message
300 306 returns [idents], msg"""
301 307 if isinstance(socket, ZMQStream):
302 308 socket = socket.socket
303 309 try:
304 310 msg = socket.recv_multipart(mode)
305 311 except zmq.ZMQError as e:
306 312 if e.errno == zmq.EAGAIN:
307 313 # We can convert EAGAIN to None as we know in this case
308 314 # recv_multipart won't return None.
309 315 return None
310 316 else:
311 317 raise
312 318 # return an actual Message object
313 319 # determine the number of idents by trying to unpack them.
314 320 # this is terrible:
315 321 idents, msg = self.feed_identities(msg, copy)
316 322 try:
317 323 return idents, self.unpack_message(msg, content=content, copy=copy)
318 324 except Exception as e:
319 325 print (idents, msg)
320 326 # TODO: handle it
321 327 raise e
322 328
323 329 def feed_identities(self, msg, copy=True):
324 330 """feed until DELIM is reached, then return the prefix as idents and remainder as
325 331 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
326 332
327 333 Parameters
328 334 ----------
329 335 msg : a list of Message or bytes objects
330 336 the message to be split
331 337 copy : bool
332 338 flag determining whether the arguments are bytes or Messages
333 339
334 340 Returns
335 341 -------
336 342 (idents,msg) : two lists
337 343 idents will always be a list of bytes - the indentity prefix
338 344 msg will be a list of bytes or Messages, unchanged from input
339 345 msg should be unpackable via self.unpack_message at this point.
340 346 """
341 347 ikey = int(self.key is not None)
342 348 minlen = 3 + ikey
343 349 msg = list(msg)
344 350 idents = []
345 351 while len(msg) > minlen:
346 352 if copy:
347 353 s = msg[0]
348 354 else:
349 355 s = msg[0].bytes
350 356 if s == DELIM:
351 357 msg.pop(0)
352 358 break
353 359 else:
354 360 idents.append(s)
355 361 msg.pop(0)
356 362
357 363 return idents, msg
358 364
359 365 def unpack_message(self, msg, content=True, copy=True):
360 366 """Return a message object from the format
361 367 sent by self.send.
362 368
363 369 Parameters:
364 370 -----------
365 371
366 372 content : bool (True)
367 373 whether to unpack the content dict (True),
368 374 or leave it serialized (False)
369 375
370 376 copy : bool (True)
371 377 whether to return the bytes (True),
372 378 or the non-copying Message object in each place (False)
373 379
374 380 """
375 381 ikey = int(self.key is not None)
376 382 minlen = 3 + ikey
377 383 message = {}
378 384 if not copy:
379 385 for i in range(minlen):
380 386 msg[i] = msg[i].bytes
381 387 if ikey:
382 388 if not self.key == msg[0]:
383 389 raise KeyError("Invalid Session Key: %s"%msg[0])
384 390 if not len(msg) >= minlen:
385 391 raise TypeError("malformed message, must have at least %i elements"%minlen)
386 392 message['header'] = self.unpack(msg[ikey+0])
387 393 message['msg_type'] = message['header']['msg_type']
388 394 message['parent_header'] = self.unpack(msg[ikey+1])
389 395 if content:
390 396 message['content'] = self.unpack(msg[ikey+2])
391 397 else:
392 398 message['content'] = msg[ikey+2]
393 399
394 400 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
395 401 return message
396 402
397 403
398 404 def test_msg2obj():
399 405 am = dict(x=1)
400 406 ao = Message(am)
401 407 assert ao.x == am['x']
402 408
403 409 am['y'] = dict(z=1)
404 410 ao = Message(am)
405 411 assert ao.y.z == am['y']['z']
406 412
407 413 k1, k2 = 'y', 'z'
408 414 assert ao[k1][k2] == am[k1][k2]
409 415
410 416 am2 = dict(ao)
411 417 assert am['x'] == am2['x']
412 418 assert am['y']['z'] == am2['y']['z']
@@ -1,100 +1,106 b''
1 1 """Thread for popping Tasks from zmq to Python Queue"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010-2011 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
2 8
3 9
4 10 import time
5 11 from threading import Thread
6 12
7 13 try:
8 14 from queue import Queue
9 15 except:
10 16 from Queue import Queue
11 17
12 18 import zmq
13 19 from zmq.core.poll import _poll as poll
14 20 from zmq.devices import ThreadDevice
15 21 from IPython.zmq.parallel import streamsession as ss
16 22
17 23
18 24 class QueueStream(object):
19 25 def __init__(self, in_queue, out_queue):
20 26 self.in_queue = in_queue
21 27 self.out_queue = out_queue
22 28
23 29 def send_multipart(self, *args, **kwargs):
24 30 while self.out_queue.full():
25 31 time.sleep(1e-3)
26 32 self.out_queue.put(('send_multipart', args, kwargs))
27 33
28 34 def send(self, *args, **kwargs):
29 35 while self.out_queue.full():
30 36 time.sleep(1e-3)
31 37 self.out_queue.put(('send', args, kwargs))
32 38
33 39 def recv_multipart(self):
34 40 return self.in_queue.get()
35 41
36 42 def empty(self):
37 43 return self.in_queue.empty()
38 44
39 45 class TaskThread(ThreadDevice):
40 46 """Class for popping Tasks from C-ZMQ->Python Queue"""
41 47 max_qsize = 100
42 48 in_socket = None
43 49 out_socket = None
44 50 # queue = None
45 51
46 52 def __init__(self, queue_type, mon_type, engine_id, max_qsize=100):
47 53 ThreadDevice.__init__(self, 0, queue_type, mon_type)
48 54 self.session = ss.StreamSession(username='TaskNotifier[%s]'%engine_id)
49 55 self.engine_id = engine_id
50 56 self.in_queue = Queue(max_qsize)
51 57 self.out_queue = Queue(max_qsize)
52 58 self.max_qsize = max_qsize
53 59
54 60 @property
55 61 def queues(self):
56 62 return self.in_queue, self.out_queue
57 63
58 64 @property
59 65 def can_recv(self):
60 66 # print self.in_queue.full(), poll((self.queue_socket, zmq.POLLIN),1e-3)
61 67 return (not self.in_queue.full()) and poll([(self.queue_socket, zmq.POLLIN)], 1e-3 )
62 68
63 69 @property
64 70 def can_send(self):
65 71 return not self.out_queue.empty()
66 72
67 73 def run(self):
68 74 print 'running'
69 75 self.queue_socket,self.mon_socket = self._setup_sockets()
70 76 print 'setup'
71 77
72 78 while True:
73 79 while not self.can_send and not self.can_recv:
74 80 # print 'idle'
75 81 # nothing to do, wait
76 82 time.sleep(1e-3)
77 83 while self.can_send:
78 84 # flush out queue
79 85 print 'flushing...'
80 86 meth, args, kwargs = self.out_queue.get()
81 87 getattr(self.queue_socket, meth)(*args, **kwargs)
82 88 print 'flushed'
83 89
84 90 if self.can_recv:
85 91 print 'recving'
86 92 # get another job from zmq
87 93 msg = self.queue_socket.recv_multipart(0, copy=False)
88 94 # put it in the Queue
89 95 self.in_queue.put(msg)
90 96 idents,msg = self.session.feed_identities(msg, copy=False)
91 97 msg = self.session.unpack_message(msg, content=False, copy=False)
92 98 # notify the Controller that we got it
93 99 self.mon_socket.send('tracktask', zmq.SNDMORE)
94 100 header = msg['header']
95 101 msg_id = header['msg_id']
96 102 content = dict(engine_id=self.engine_id, msg_id = msg_id)
97 103 self.session.send(self.mon_socket, 'task_receipt', content=content)
98 104 print 'recvd'
99 105
100 106 No newline at end of file
@@ -1,299 +1,318 b''
1 1 """some generic utilities for dealing with classes, urls, and serialization"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010-2011 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
8
9 #-----------------------------------------------------------------------------
10 # Imports
11 #-----------------------------------------------------------------------------
12
2 13 import re
3 14 import socket
4 15
5 16 try:
6 17 import cPickle
7 18 pickle = cPickle
8 19 except:
9 20 cPickle = None
10 21 import pickle
11 22
12 23
13 24 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
14 25 from IPython.utils.newserialized import serialize, unserialize
15 26
16 27 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
17 28
29 #-----------------------------------------------------------------------------
30 # Classes
31 #-----------------------------------------------------------------------------
32
18 33 class Namespace(dict):
19 34 """Subclass of dict for attribute access to keys."""
20 35
21 36 def __getattr__(self, key):
22 37 """getattr aliased to getitem"""
23 38 if key in self.iterkeys():
24 39 return self[key]
25 40 else:
26 41 raise NameError(key)
27 42
28 43 def __setattr__(self, key, value):
29 44 """setattr aliased to setitem, with strict"""
30 45 if hasattr(dict, key):
31 46 raise KeyError("Cannot override dict keys %r"%key)
32 47 self[key] = value
33 48
34 49
35 50 class ReverseDict(dict):
36 51 """simple double-keyed subset of dict methods."""
37 52
38 53 def __init__(self, *args, **kwargs):
39 54 dict.__init__(self, *args, **kwargs)
40 55 self._reverse = dict()
41 56 for key, value in self.iteritems():
42 57 self._reverse[value] = key
43 58
44 59 def __getitem__(self, key):
45 60 try:
46 61 return dict.__getitem__(self, key)
47 62 except KeyError:
48 63 return self._reverse[key]
49 64
50 65 def __setitem__(self, key, value):
51 66 if key in self._reverse:
52 67 raise KeyError("Can't have key %r on both sides!"%key)
53 68 dict.__setitem__(self, key, value)
54 69 self._reverse[value] = key
55 70
56 71 def pop(self, key):
57 72 value = dict.pop(self, key)
58 73 self._reverse.pop(value)
59 74 return value
60 75
61 76 def get(self, key, default=None):
62 77 try:
63 78 return self[key]
64 79 except KeyError:
65 80 return default
66 81
82 #-----------------------------------------------------------------------------
83 # Functions
84 #-----------------------------------------------------------------------------
85
67 86 def validate_url(url):
68 87 """validate a url for zeromq"""
69 88 if not isinstance(url, basestring):
70 89 raise TypeError("url must be a string, not %r"%type(url))
71 90 url = url.lower()
72 91
73 92 proto_addr = url.split('://')
74 93 assert len(proto_addr) == 2, 'Invalid url: %r'%url
75 94 proto, addr = proto_addr
76 95 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
77 96
78 97 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
79 98 # author: Remi Sabourin
80 99 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
81 100
82 101 if proto == 'tcp':
83 102 lis = addr.split(':')
84 103 assert len(lis) == 2, 'Invalid url: %r'%url
85 104 addr,s_port = lis
86 105 try:
87 106 port = int(s_port)
88 107 except ValueError:
89 108 raise AssertionError("Invalid port %r in url: %r"%(port, url))
90 109
91 110 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
92 111
93 112 else:
94 113 # only validate tcp urls currently
95 114 pass
96 115
97 116 return True
98 117
99 118
100 119 def validate_url_container(container):
101 120 """validate a potentially nested collection of urls."""
102 121 if isinstance(container, basestring):
103 122 url = container
104 123 return validate_url(url)
105 124 elif isinstance(container, dict):
106 125 container = container.itervalues()
107 126
108 127 for element in container:
109 128 validate_url_container(element)
110 129
111 130
112 131 def split_url(url):
113 132 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
114 133 proto_addr = url.split('://')
115 134 assert len(proto_addr) == 2, 'Invalid url: %r'%url
116 135 proto, addr = proto_addr
117 136 lis = addr.split(':')
118 137 assert len(lis) == 2, 'Invalid url: %r'%url
119 138 addr,s_port = lis
120 139 return proto,addr,s_port
121 140
122 141 def disambiguate_ip_address(ip, location=None):
123 142 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
124 143 ones, based on the location (default interpretation of location is localhost)."""
125 144 if ip in ('0.0.0.0', '*'):
126 145 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
127 146 if location is None or location in external_ips:
128 147 ip='127.0.0.1'
129 148 elif location:
130 149 return location
131 150 return ip
132 151
133 152 def disambiguate_url(url, location=None):
134 153 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
135 154 ones, based on the location (default interpretation is localhost).
136 155
137 156 This is for zeromq urls, such as tcp://*:10101."""
138 157 try:
139 158 proto,ip,port = split_url(url)
140 159 except AssertionError:
141 160 # probably not tcp url; could be ipc, etc.
142 161 return url
143 162
144 163 ip = disambiguate_ip_address(ip,location)
145 164
146 165 return "%s://%s:%s"%(proto,ip,port)
147 166
148 167
149 168 def rekey(dikt):
150 169 """Rekey a dict that has been forced to use str keys where there should be
151 170 ints by json. This belongs in the jsonutil added by fperez."""
152 171 for k in dikt.iterkeys():
153 172 if isinstance(k, str):
154 173 ik=fk=None
155 174 try:
156 175 ik = int(k)
157 176 except ValueError:
158 177 try:
159 178 fk = float(k)
160 179 except ValueError:
161 180 continue
162 181 if ik is not None:
163 182 nk = ik
164 183 else:
165 184 nk = fk
166 185 if nk in dikt:
167 186 raise KeyError("already have key %r"%nk)
168 187 dikt[nk] = dikt.pop(k)
169 188 return dikt
170 189
171 190 def serialize_object(obj, threshold=64e-6):
172 191 """Serialize an object into a list of sendable buffers.
173 192
174 193 Parameters
175 194 ----------
176 195
177 196 obj : object
178 197 The object to be serialized
179 198 threshold : float
180 199 The threshold for not double-pickling the content.
181 200
182 201
183 202 Returns
184 203 -------
185 204 ('pmd', [bufs]) :
186 205 where pmd is the pickled metadata wrapper,
187 206 bufs is a list of data buffers
188 207 """
189 208 databuffers = []
190 209 if isinstance(obj, (list, tuple)):
191 210 clist = canSequence(obj)
192 211 slist = map(serialize, clist)
193 212 for s in slist:
194 213 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
195 214 databuffers.append(s.getData())
196 215 s.data = None
197 216 return pickle.dumps(slist,-1), databuffers
198 217 elif isinstance(obj, dict):
199 218 sobj = {}
200 219 for k in sorted(obj.iterkeys()):
201 220 s = serialize(can(obj[k]))
202 221 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
203 222 databuffers.append(s.getData())
204 223 s.data = None
205 224 sobj[k] = s
206 225 return pickle.dumps(sobj,-1),databuffers
207 226 else:
208 227 s = serialize(can(obj))
209 228 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
210 229 databuffers.append(s.getData())
211 230 s.data = None
212 231 return pickle.dumps(s,-1),databuffers
213 232
214 233
215 234 def unserialize_object(bufs):
216 235 """reconstruct an object serialized by serialize_object from data buffers."""
217 236 bufs = list(bufs)
218 237 sobj = pickle.loads(bufs.pop(0))
219 238 if isinstance(sobj, (list, tuple)):
220 239 for s in sobj:
221 240 if s.data is None:
222 241 s.data = bufs.pop(0)
223 242 return uncanSequence(map(unserialize, sobj)), bufs
224 243 elif isinstance(sobj, dict):
225 244 newobj = {}
226 245 for k in sorted(sobj.iterkeys()):
227 246 s = sobj[k]
228 247 if s.data is None:
229 248 s.data = bufs.pop(0)
230 249 newobj[k] = uncan(unserialize(s))
231 250 return newobj, bufs
232 251 else:
233 252 if sobj.data is None:
234 253 sobj.data = bufs.pop(0)
235 254 return uncan(unserialize(sobj)), bufs
236 255
237 256 def pack_apply_message(f, args, kwargs, threshold=64e-6):
238 257 """pack up a function, args, and kwargs to be sent over the wire
239 258 as a series of buffers. Any object whose data is larger than `threshold`
240 259 will not have their data copied (currently only numpy arrays support zero-copy)"""
241 260 msg = [pickle.dumps(can(f),-1)]
242 261 databuffers = [] # for large objects
243 262 sargs, bufs = serialize_object(args,threshold)
244 263 msg.append(sargs)
245 264 databuffers.extend(bufs)
246 265 skwargs, bufs = serialize_object(kwargs,threshold)
247 266 msg.append(skwargs)
248 267 databuffers.extend(bufs)
249 268 msg.extend(databuffers)
250 269 return msg
251 270
252 271 def unpack_apply_message(bufs, g=None, copy=True):
253 272 """unpack f,args,kwargs from buffers packed by pack_apply_message()
254 273 Returns: original f,args,kwargs"""
255 274 bufs = list(bufs) # allow us to pop
256 275 assert len(bufs) >= 3, "not enough buffers!"
257 276 if not copy:
258 277 for i in range(3):
259 278 bufs[i] = bufs[i].bytes
260 279 cf = pickle.loads(bufs.pop(0))
261 280 sargs = list(pickle.loads(bufs.pop(0)))
262 281 skwargs = dict(pickle.loads(bufs.pop(0)))
263 282 # print sargs, skwargs
264 283 f = uncan(cf, g)
265 284 for sa in sargs:
266 285 if sa.data is None:
267 286 m = bufs.pop(0)
268 287 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
269 288 if copy:
270 289 sa.data = buffer(m)
271 290 else:
272 291 sa.data = m.buffer
273 292 else:
274 293 if copy:
275 294 sa.data = m
276 295 else:
277 296 sa.data = m.bytes
278 297
279 298 args = uncanSequence(map(unserialize, sargs), g)
280 299 kwargs = {}
281 300 for k in sorted(skwargs.iterkeys()):
282 301 sa = skwargs[k]
283 302 if sa.data is None:
284 303 m = bufs.pop(0)
285 304 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
286 305 if copy:
287 306 sa.data = buffer(m)
288 307 else:
289 308 sa.data = m.buffer
290 309 else:
291 310 if copy:
292 311 sa.data = m
293 312 else:
294 313 sa.data = m.bytes
295 314
296 315 kwargs[k] = uncan(unserialize(sa), g)
297 316
298 317 return f,args,kwargs
299 318
General Comments 0
You need to be logged in to leave comments. Login now