##// END OF EJS Templates
copyright statements
MinRK -
Show More
@@ -1,322 +1,322 b''
1 """AsyncResult objects for the client"""
1 """AsyncResult objects for the client"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010-2011 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import time
13 import time
14
14
15 from IPython.external.decorator import decorator
15 from IPython.external.decorator import decorator
16 from . import error
16 from . import error
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Classes
19 # Classes
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 @decorator
22 @decorator
23 def check_ready(f, self, *args, **kwargs):
23 def check_ready(f, self, *args, **kwargs):
24 """Call spin() to sync state prior to calling the method."""
24 """Call spin() to sync state prior to calling the method."""
25 self.wait(0)
25 self.wait(0)
26 if not self._ready:
26 if not self._ready:
27 raise error.TimeoutError("result not ready")
27 raise error.TimeoutError("result not ready")
28 return f(self, *args, **kwargs)
28 return f(self, *args, **kwargs)
29
29
30 class AsyncResult(object):
30 class AsyncResult(object):
31 """Class for representing results of non-blocking calls.
31 """Class for representing results of non-blocking calls.
32
32
33 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
33 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
34 """
34 """
35
35
36 msg_ids = None
36 msg_ids = None
37 _targets = None
37 _targets = None
38 _tracker = None
38 _tracker = None
39
39
40 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
40 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
41 self._client = client
41 self._client = client
42 if isinstance(msg_ids, basestring):
42 if isinstance(msg_ids, basestring):
43 msg_ids = [msg_ids]
43 msg_ids = [msg_ids]
44 self.msg_ids = msg_ids
44 self.msg_ids = msg_ids
45 self._fname=fname
45 self._fname=fname
46 self._targets = targets
46 self._targets = targets
47 self._tracker = tracker
47 self._tracker = tracker
48 self._ready = False
48 self._ready = False
49 self._success = None
49 self._success = None
50 self._single_result = len(msg_ids) == 1
50 self._single_result = len(msg_ids) == 1
51
51
52 def __repr__(self):
52 def __repr__(self):
53 if self._ready:
53 if self._ready:
54 return "<%s: finished>"%(self.__class__.__name__)
54 return "<%s: finished>"%(self.__class__.__name__)
55 else:
55 else:
56 return "<%s: %s>"%(self.__class__.__name__,self._fname)
56 return "<%s: %s>"%(self.__class__.__name__,self._fname)
57
57
58
58
59 def _reconstruct_result(self, res):
59 def _reconstruct_result(self, res):
60 """Reconstruct our result from actual result list (always a list)
60 """Reconstruct our result from actual result list (always a list)
61
61
62 Override me in subclasses for turning a list of results
62 Override me in subclasses for turning a list of results
63 into the expected form.
63 into the expected form.
64 """
64 """
65 if self._single_result:
65 if self._single_result:
66 return res[0]
66 return res[0]
67 else:
67 else:
68 return res
68 return res
69
69
70 def get(self, timeout=-1):
70 def get(self, timeout=-1):
71 """Return the result when it arrives.
71 """Return the result when it arrives.
72
72
73 If `timeout` is not ``None`` and the result does not arrive within
73 If `timeout` is not ``None`` and the result does not arrive within
74 `timeout` seconds then ``TimeoutError`` is raised. If the
74 `timeout` seconds then ``TimeoutError`` is raised. If the
75 remote call raised an exception then that exception will be reraised
75 remote call raised an exception then that exception will be reraised
76 by get() inside a `RemoteError`.
76 by get() inside a `RemoteError`.
77 """
77 """
78 if not self.ready():
78 if not self.ready():
79 self.wait(timeout)
79 self.wait(timeout)
80
80
81 if self._ready:
81 if self._ready:
82 if self._success:
82 if self._success:
83 return self._result
83 return self._result
84 else:
84 else:
85 raise self._exception
85 raise self._exception
86 else:
86 else:
87 raise error.TimeoutError("Result not ready.")
87 raise error.TimeoutError("Result not ready.")
88
88
89 def ready(self):
89 def ready(self):
90 """Return whether the call has completed."""
90 """Return whether the call has completed."""
91 if not self._ready:
91 if not self._ready:
92 self.wait(0)
92 self.wait(0)
93 return self._ready
93 return self._ready
94
94
95 def wait(self, timeout=-1):
95 def wait(self, timeout=-1):
96 """Wait until the result is available or until `timeout` seconds pass.
96 """Wait until the result is available or until `timeout` seconds pass.
97
97
98 This method always returns None.
98 This method always returns None.
99 """
99 """
100 if self._ready:
100 if self._ready:
101 return
101 return
102 self._ready = self._client.barrier(self.msg_ids, timeout)
102 self._ready = self._client.barrier(self.msg_ids, timeout)
103 if self._ready:
103 if self._ready:
104 try:
104 try:
105 results = map(self._client.results.get, self.msg_ids)
105 results = map(self._client.results.get, self.msg_ids)
106 self._result = results
106 self._result = results
107 if self._single_result:
107 if self._single_result:
108 r = results[0]
108 r = results[0]
109 if isinstance(r, Exception):
109 if isinstance(r, Exception):
110 raise r
110 raise r
111 else:
111 else:
112 results = error.collect_exceptions(results, self._fname)
112 results = error.collect_exceptions(results, self._fname)
113 self._result = self._reconstruct_result(results)
113 self._result = self._reconstruct_result(results)
114 except Exception, e:
114 except Exception, e:
115 self._exception = e
115 self._exception = e
116 self._success = False
116 self._success = False
117 else:
117 else:
118 self._success = True
118 self._success = True
119 finally:
119 finally:
120 self._metadata = map(self._client.metadata.get, self.msg_ids)
120 self._metadata = map(self._client.metadata.get, self.msg_ids)
121
121
122
122
123 def successful(self):
123 def successful(self):
124 """Return whether the call completed without raising an exception.
124 """Return whether the call completed without raising an exception.
125
125
126 Will raise ``AssertionError`` if the result is not ready.
126 Will raise ``AssertionError`` if the result is not ready.
127 """
127 """
128 assert self.ready()
128 assert self.ready()
129 return self._success
129 return self._success
130
130
131 #----------------------------------------------------------------
131 #----------------------------------------------------------------
132 # Extra methods not in mp.pool.AsyncResult
132 # Extra methods not in mp.pool.AsyncResult
133 #----------------------------------------------------------------
133 #----------------------------------------------------------------
134
134
135 def get_dict(self, timeout=-1):
135 def get_dict(self, timeout=-1):
136 """Get the results as a dict, keyed by engine_id.
136 """Get the results as a dict, keyed by engine_id.
137
137
138 timeout behavior is described in `get()`.
138 timeout behavior is described in `get()`.
139 """
139 """
140
140
141 results = self.get(timeout)
141 results = self.get(timeout)
142 engine_ids = [ md['engine_id'] for md in self._metadata ]
142 engine_ids = [ md['engine_id'] for md in self._metadata ]
143 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
143 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
144 maxcount = bycount.count(bycount[-1])
144 maxcount = bycount.count(bycount[-1])
145 if maxcount > 1:
145 if maxcount > 1:
146 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
146 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
147 maxcount, bycount[-1]))
147 maxcount, bycount[-1]))
148
148
149 return dict(zip(engine_ids,results))
149 return dict(zip(engine_ids,results))
150
150
151 @property
151 @property
152 @check_ready
152 @check_ready
153 def result(self):
153 def result(self):
154 """result property wrapper for `get(timeout=0)`."""
154 """result property wrapper for `get(timeout=0)`."""
155 return self._result
155 return self._result
156
156
157 # abbreviated alias:
157 # abbreviated alias:
158 r = result
158 r = result
159
159
160 @property
160 @property
161 @check_ready
161 @check_ready
162 def metadata(self):
162 def metadata(self):
163 """property for accessing execution metadata."""
163 """property for accessing execution metadata."""
164 if self._single_result:
164 if self._single_result:
165 return self._metadata[0]
165 return self._metadata[0]
166 else:
166 else:
167 return self._metadata
167 return self._metadata
168
168
169 @property
169 @property
170 def result_dict(self):
170 def result_dict(self):
171 """result property as a dict."""
171 """result property as a dict."""
172 return self.get_dict(0)
172 return self.get_dict(0)
173
173
174 def __dict__(self):
174 def __dict__(self):
175 return self.get_dict(0)
175 return self.get_dict(0)
176
176
177 def abort(self):
177 def abort(self):
178 """abort my tasks."""
178 """abort my tasks."""
179 assert not self.ready(), "Can't abort, I am already done!"
179 assert not self.ready(), "Can't abort, I am already done!"
180 return self.client.abort(self.msg_ids, targets=self._targets, block=True)
180 return self.client.abort(self.msg_ids, targets=self._targets, block=True)
181
181
182 @property
182 @property
183 def sent(self):
183 def sent(self):
184 """check whether my messages have been sent"""
184 """check whether my messages have been sent"""
185 if self._tracker is None:
185 if self._tracker is None:
186 return True
186 return True
187 else:
187 else:
188 return self._tracker.done
188 return self._tracker.done
189
189
190 #-------------------------------------
190 #-------------------------------------
191 # dict-access
191 # dict-access
192 #-------------------------------------
192 #-------------------------------------
193
193
194 @check_ready
194 @check_ready
195 def __getitem__(self, key):
195 def __getitem__(self, key):
196 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
196 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
197 """
197 """
198 if isinstance(key, int):
198 if isinstance(key, int):
199 return error.collect_exceptions([self._result[key]], self._fname)[0]
199 return error.collect_exceptions([self._result[key]], self._fname)[0]
200 elif isinstance(key, slice):
200 elif isinstance(key, slice):
201 return error.collect_exceptions(self._result[key], self._fname)
201 return error.collect_exceptions(self._result[key], self._fname)
202 elif isinstance(key, basestring):
202 elif isinstance(key, basestring):
203 values = [ md[key] for md in self._metadata ]
203 values = [ md[key] for md in self._metadata ]
204 if self._single_result:
204 if self._single_result:
205 return values[0]
205 return values[0]
206 else:
206 else:
207 return values
207 return values
208 else:
208 else:
209 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
209 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
210
210
211 @check_ready
211 @check_ready
212 def __getattr__(self, key):
212 def __getattr__(self, key):
213 """getattr maps to getitem for convenient attr access to metadata."""
213 """getattr maps to getitem for convenient attr access to metadata."""
214 if key not in self._metadata[0].keys():
214 if key not in self._metadata[0].keys():
215 raise AttributeError("%r object has no attribute %r"%(
215 raise AttributeError("%r object has no attribute %r"%(
216 self.__class__.__name__, key))
216 self.__class__.__name__, key))
217 return self.__getitem__(key)
217 return self.__getitem__(key)
218
218
219 # asynchronous iterator:
219 # asynchronous iterator:
220 def __iter__(self):
220 def __iter__(self):
221 if self._single_result:
221 if self._single_result:
222 raise TypeError("AsyncResults with a single result are not iterable.")
222 raise TypeError("AsyncResults with a single result are not iterable.")
223 try:
223 try:
224 rlist = self.get(0)
224 rlist = self.get(0)
225 except error.TimeoutError:
225 except error.TimeoutError:
226 # wait for each result individually
226 # wait for each result individually
227 for msg_id in self.msg_ids:
227 for msg_id in self.msg_ids:
228 ar = AsyncResult(self._client, msg_id, self._fname)
228 ar = AsyncResult(self._client, msg_id, self._fname)
229 yield ar.get()
229 yield ar.get()
230 else:
230 else:
231 # already done
231 # already done
232 for r in rlist:
232 for r in rlist:
233 yield r
233 yield r
234
234
235
235
236
236
237 class AsyncMapResult(AsyncResult):
237 class AsyncMapResult(AsyncResult):
238 """Class for representing results of non-blocking gathers.
238 """Class for representing results of non-blocking gathers.
239
239
240 This will properly reconstruct the gather.
240 This will properly reconstruct the gather.
241 """
241 """
242
242
243 def __init__(self, client, msg_ids, mapObject, fname=''):
243 def __init__(self, client, msg_ids, mapObject, fname=''):
244 AsyncResult.__init__(self, client, msg_ids, fname=fname)
244 AsyncResult.__init__(self, client, msg_ids, fname=fname)
245 self._mapObject = mapObject
245 self._mapObject = mapObject
246 self._single_result = False
246 self._single_result = False
247
247
248 def _reconstruct_result(self, res):
248 def _reconstruct_result(self, res):
249 """Perform the gather on the actual results."""
249 """Perform the gather on the actual results."""
250 return self._mapObject.joinPartitions(res)
250 return self._mapObject.joinPartitions(res)
251
251
252 # asynchronous iterator:
252 # asynchronous iterator:
253 def __iter__(self):
253 def __iter__(self):
254 try:
254 try:
255 rlist = self.get(0)
255 rlist = self.get(0)
256 except error.TimeoutError:
256 except error.TimeoutError:
257 # wait for each result individually
257 # wait for each result individually
258 for msg_id in self.msg_ids:
258 for msg_id in self.msg_ids:
259 ar = AsyncResult(self._client, msg_id, self._fname)
259 ar = AsyncResult(self._client, msg_id, self._fname)
260 rlist = ar.get()
260 rlist = ar.get()
261 try:
261 try:
262 for r in rlist:
262 for r in rlist:
263 yield r
263 yield r
264 except TypeError:
264 except TypeError:
265 # flattened, not a list
265 # flattened, not a list
266 # this could get broken by flattened data that returns iterables
266 # this could get broken by flattened data that returns iterables
267 # but most calls to map do not expose the `flatten` argument
267 # but most calls to map do not expose the `flatten` argument
268 yield rlist
268 yield rlist
269 else:
269 else:
270 # already done
270 # already done
271 for r in rlist:
271 for r in rlist:
272 yield r
272 yield r
273
273
274
274
275 class AsyncHubResult(AsyncResult):
275 class AsyncHubResult(AsyncResult):
276 """Class to wrap pending results that must be requested from the Hub.
276 """Class to wrap pending results that must be requested from the Hub.
277
277
278 Note that waiting/polling on these objects requires polling the Hubover the network,
278 Note that waiting/polling on these objects requires polling the Hubover the network,
279 so use `AsyncHubResult.wait()` sparingly.
279 so use `AsyncHubResult.wait()` sparingly.
280 """
280 """
281
281
282 def wait(self, timeout=-1):
282 def wait(self, timeout=-1):
283 """wait for result to complete."""
283 """wait for result to complete."""
284 start = time.time()
284 start = time.time()
285 if self._ready:
285 if self._ready:
286 return
286 return
287 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
287 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
288 local_ready = self._client.barrier(local_ids, timeout)
288 local_ready = self._client.barrier(local_ids, timeout)
289 if local_ready:
289 if local_ready:
290 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
290 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
291 if not remote_ids:
291 if not remote_ids:
292 self._ready = True
292 self._ready = True
293 else:
293 else:
294 rdict = self._client.result_status(remote_ids, status_only=False)
294 rdict = self._client.result_status(remote_ids, status_only=False)
295 pending = rdict['pending']
295 pending = rdict['pending']
296 while pending and (timeout < 0 or time.time() < start+timeout):
296 while pending and (timeout < 0 or time.time() < start+timeout):
297 rdict = self._client.result_status(remote_ids, status_only=False)
297 rdict = self._client.result_status(remote_ids, status_only=False)
298 pending = rdict['pending']
298 pending = rdict['pending']
299 if pending:
299 if pending:
300 time.sleep(0.1)
300 time.sleep(0.1)
301 if not pending:
301 if not pending:
302 self._ready = True
302 self._ready = True
303 if self._ready:
303 if self._ready:
304 try:
304 try:
305 results = map(self._client.results.get, self.msg_ids)
305 results = map(self._client.results.get, self.msg_ids)
306 self._result = results
306 self._result = results
307 if self._single_result:
307 if self._single_result:
308 r = results[0]
308 r = results[0]
309 if isinstance(r, Exception):
309 if isinstance(r, Exception):
310 raise r
310 raise r
311 else:
311 else:
312 results = error.collect_exceptions(results, self._fname)
312 results = error.collect_exceptions(results, self._fname)
313 self._result = self._reconstruct_result(results)
313 self._result = self._reconstruct_result(results)
314 except Exception, e:
314 except Exception, e:
315 self._exception = e
315 self._exception = e
316 self._success = False
316 self._success = False
317 else:
317 else:
318 self._success = True
318 self._success = True
319 finally:
319 finally:
320 self._metadata = map(self._client.metadata.get, self.msg_ids)
320 self._metadata = map(self._client.metadata.get, self.msg_ids)
321
321
322 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
322 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
@@ -1,153 +1,159 b''
1 """Dependency utilities"""
1 """Dependency utilities"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010-2011 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
2
8
3 from IPython.external.decorator import decorator
9 from IPython.external.decorator import decorator
4
10
5 from .asyncresult import AsyncResult
11 from .asyncresult import AsyncResult
6 from .error import UnmetDependency
12 from .error import UnmetDependency
7
13
8
14
9 class depend(object):
15 class depend(object):
10 """Dependency decorator, for use with tasks.
16 """Dependency decorator, for use with tasks.
11
17
12 `@depend` lets you define a function for engine dependencies
18 `@depend` lets you define a function for engine dependencies
13 just like you use `apply` for tasks.
19 just like you use `apply` for tasks.
14
20
15
21
16 Examples
22 Examples
17 --------
23 --------
18 ::
24 ::
19
25
20 @depend(df, a,b, c=5)
26 @depend(df, a,b, c=5)
21 def f(m,n,p)
27 def f(m,n,p)
22
28
23 view.apply(f, 1,2,3)
29 view.apply(f, 1,2,3)
24
30
25 will call df(a,b,c=5) on the engine, and if it returns False or
31 will call df(a,b,c=5) on the engine, and if it returns False or
26 raises an UnmetDependency error, then the task will not be run
32 raises an UnmetDependency error, then the task will not be run
27 and another engine will be tried.
33 and another engine will be tried.
28 """
34 """
29 def __init__(self, f, *args, **kwargs):
35 def __init__(self, f, *args, **kwargs):
30 self.f = f
36 self.f = f
31 self.args = args
37 self.args = args
32 self.kwargs = kwargs
38 self.kwargs = kwargs
33
39
34 def __call__(self, f):
40 def __call__(self, f):
35 return dependent(f, self.f, *self.args, **self.kwargs)
41 return dependent(f, self.f, *self.args, **self.kwargs)
36
42
37 class dependent(object):
43 class dependent(object):
38 """A function that depends on another function.
44 """A function that depends on another function.
39 This is an object to prevent the closure used
45 This is an object to prevent the closure used
40 in traditional decorators, which are not picklable.
46 in traditional decorators, which are not picklable.
41 """
47 """
42
48
43 def __init__(self, f, df, *dargs, **dkwargs):
49 def __init__(self, f, df, *dargs, **dkwargs):
44 self.f = f
50 self.f = f
45 self.func_name = getattr(f, '__name__', 'f')
51 self.func_name = getattr(f, '__name__', 'f')
46 self.df = df
52 self.df = df
47 self.dargs = dargs
53 self.dargs = dargs
48 self.dkwargs = dkwargs
54 self.dkwargs = dkwargs
49
55
50 def __call__(self, *args, **kwargs):
56 def __call__(self, *args, **kwargs):
51 if self.df(*self.dargs, **self.dkwargs) is False:
57 if self.df(*self.dargs, **self.dkwargs) is False:
52 raise UnmetDependency()
58 raise UnmetDependency()
53 return self.f(*args, **kwargs)
59 return self.f(*args, **kwargs)
54
60
55 @property
61 @property
56 def __name__(self):
62 def __name__(self):
57 return self.func_name
63 return self.func_name
58
64
59 def _require(*names):
65 def _require(*names):
60 """Helper for @require decorator."""
66 """Helper for @require decorator."""
61 for name in names:
67 for name in names:
62 try:
68 try:
63 __import__(name)
69 __import__(name)
64 except ImportError:
70 except ImportError:
65 return False
71 return False
66 return True
72 return True
67
73
68 def require(*names):
74 def require(*names):
69 """Simple decorator for requiring names to be importable.
75 """Simple decorator for requiring names to be importable.
70
76
71 Examples
77 Examples
72 --------
78 --------
73
79
74 In [1]: @require('numpy')
80 In [1]: @require('numpy')
75 ...: def norm(a):
81 ...: def norm(a):
76 ...: import numpy
82 ...: import numpy
77 ...: return numpy.linalg.norm(a,2)
83 ...: return numpy.linalg.norm(a,2)
78 """
84 """
79 return depend(_require, *names)
85 return depend(_require, *names)
80
86
81 class Dependency(set):
87 class Dependency(set):
82 """An object for representing a set of msg_id dependencies.
88 """An object for representing a set of msg_id dependencies.
83
89
84 Subclassed from set().
90 Subclassed from set().
85
91
86 Parameters
92 Parameters
87 ----------
93 ----------
88 dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
94 dependencies: list/set of msg_ids or AsyncResult objects or output of Dependency.as_dict()
89 The msg_ids to depend on
95 The msg_ids to depend on
90 all : bool [default True]
96 all : bool [default True]
91 Whether the dependency should be considered met when *all* depending tasks have completed
97 Whether the dependency should be considered met when *all* depending tasks have completed
92 or only when *any* have been completed.
98 or only when *any* have been completed.
93 success_only : bool [default True]
99 success_only : bool [default True]
94 Whether to consider only successes for Dependencies, or consider failures as well.
100 Whether to consider only successes for Dependencies, or consider failures as well.
95 If `all=success_only=True`, then this task will fail with an ImpossibleDependency
101 If `all=success_only=True`, then this task will fail with an ImpossibleDependency
96 as soon as the first depended-upon task fails.
102 as soon as the first depended-upon task fails.
97 """
103 """
98
104
99 all=True
105 all=True
100 success_only=True
106 success_only=True
101
107
102 def __init__(self, dependencies=[], all=True, success_only=True):
108 def __init__(self, dependencies=[], all=True, success_only=True):
103 if isinstance(dependencies, dict):
109 if isinstance(dependencies, dict):
104 # load from dict
110 # load from dict
105 all = dependencies.get('all', True)
111 all = dependencies.get('all', True)
106 success_only = dependencies.get('success_only', success_only)
112 success_only = dependencies.get('success_only', success_only)
107 dependencies = dependencies.get('dependencies', [])
113 dependencies = dependencies.get('dependencies', [])
108 ids = []
114 ids = []
109 if isinstance(dependencies, AsyncResult):
115 if isinstance(dependencies, AsyncResult):
110 ids.extend(AsyncResult.msg_ids)
116 ids.extend(AsyncResult.msg_ids)
111 else:
117 else:
112 for d in dependencies:
118 for d in dependencies:
113 if isinstance(d, basestring):
119 if isinstance(d, basestring):
114 ids.append(d)
120 ids.append(d)
115 elif isinstance(d, AsyncResult):
121 elif isinstance(d, AsyncResult):
116 ids.extend(d.msg_ids)
122 ids.extend(d.msg_ids)
117 else:
123 else:
118 raise TypeError("invalid dependency type: %r"%type(d))
124 raise TypeError("invalid dependency type: %r"%type(d))
119 set.__init__(self, ids)
125 set.__init__(self, ids)
120 self.all = all
126 self.all = all
121 self.success_only=success_only
127 self.success_only=success_only
122
128
123 def check(self, completed, failed=None):
129 def check(self, completed, failed=None):
124 if failed is not None and not self.success_only:
130 if failed is not None and not self.success_only:
125 completed = completed.union(failed)
131 completed = completed.union(failed)
126 if len(self) == 0:
132 if len(self) == 0:
127 return True
133 return True
128 if self.all:
134 if self.all:
129 return self.issubset(completed)
135 return self.issubset(completed)
130 else:
136 else:
131 return not self.isdisjoint(completed)
137 return not self.isdisjoint(completed)
132
138
133 def unreachable(self, failed):
139 def unreachable(self, failed):
134 if len(self) == 0 or len(failed) == 0 or not self.success_only:
140 if len(self) == 0 or len(failed) == 0 or not self.success_only:
135 return False
141 return False
136 # print self, self.success_only, self.all, failed
142 # print self, self.success_only, self.all, failed
137 if self.all:
143 if self.all:
138 return not self.isdisjoint(failed)
144 return not self.isdisjoint(failed)
139 else:
145 else:
140 return self.issubset(failed)
146 return self.issubset(failed)
141
147
142
148
143 def as_dict(self):
149 def as_dict(self):
144 """Represent this dependency as a dict. For json compatibility."""
150 """Represent this dependency as a dict. For json compatibility."""
145 return dict(
151 return dict(
146 dependencies=list(self),
152 dependencies=list(self),
147 all=self.all,
153 all=self.all,
148 success_only=self.success_only,
154 success_only=self.success_only,
149 )
155 )
150
156
151
157
152 __all__ = ['depend', 'require', 'dependent', 'Dependency']
158 __all__ = ['depend', 'require', 'dependent', 'Dependency']
153
159
@@ -1,148 +1,155 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple engine that talks to a controller over 0MQ.
2 """A simple engine that talks to a controller over 0MQ.
3 it handles registration, etc. and launches a kernel
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's Schedulers.
4 connected to the Controller's Schedulers.
5 """
5 """
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010-2011 The IPython Development Team
8 #
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
12
6 from __future__ import print_function
13 from __future__ import print_function
7
14
8 import sys
15 import sys
9 import time
16 import time
10
17
11 import zmq
18 import zmq
12 from zmq.eventloop import ioloop, zmqstream
19 from zmq.eventloop import ioloop, zmqstream
13
20
14 # internal
21 # internal
15 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat
22 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat
16 # from IPython.utils.localinterfaces import LOCALHOST
23 # from IPython.utils.localinterfaces import LOCALHOST
17
24
18 from . import heartmonitor
25 from . import heartmonitor
19 from .factory import RegistrationFactory
26 from .factory import RegistrationFactory
20 from .streamkernel import Kernel
27 from .streamkernel import Kernel
21 from .streamsession import Message
28 from .streamsession import Message
22 from .util import disambiguate_url
29 from .util import disambiguate_url
23
30
24 class EngineFactory(RegistrationFactory):
31 class EngineFactory(RegistrationFactory):
25 """IPython engine"""
32 """IPython engine"""
26
33
27 # configurables:
34 # configurables:
28 user_ns=Dict(config=True)
35 user_ns=Dict(config=True)
29 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
36 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
30 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
37 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
31 location=Str(config=True)
38 location=Str(config=True)
32 timeout=CFloat(2,config=True)
39 timeout=CFloat(2,config=True)
33
40
34 # not configurable:
41 # not configurable:
35 id=Int(allow_none=True)
42 id=Int(allow_none=True)
36 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
43 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
37 kernel=Instance(Kernel)
44 kernel=Instance(Kernel)
38
45
39
46
40 def __init__(self, **kwargs):
47 def __init__(self, **kwargs):
41 super(EngineFactory, self).__init__(**kwargs)
48 super(EngineFactory, self).__init__(**kwargs)
42 ctx = self.context
49 ctx = self.context
43
50
44 reg = ctx.socket(zmq.XREQ)
51 reg = ctx.socket(zmq.XREQ)
45 reg.setsockopt(zmq.IDENTITY, self.ident)
52 reg.setsockopt(zmq.IDENTITY, self.ident)
46 reg.connect(self.url)
53 reg.connect(self.url)
47 self.registrar = zmqstream.ZMQStream(reg, self.loop)
54 self.registrar = zmqstream.ZMQStream(reg, self.loop)
48
55
49 def register(self):
56 def register(self):
50 """send the registration_request"""
57 """send the registration_request"""
51
58
52 self.log.info("registering")
59 self.log.info("registering")
53 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
60 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
54 self.registrar.on_recv(self.complete_registration)
61 self.registrar.on_recv(self.complete_registration)
55 # print (self.session.key)
62 # print (self.session.key)
56 self.session.send(self.registrar, "registration_request",content=content)
63 self.session.send(self.registrar, "registration_request",content=content)
57
64
58 def complete_registration(self, msg):
65 def complete_registration(self, msg):
59 # print msg
66 # print msg
60 self._abort_dc.stop()
67 self._abort_dc.stop()
61 ctx = self.context
68 ctx = self.context
62 loop = self.loop
69 loop = self.loop
63 identity = self.ident
70 identity = self.ident
64
71
65 idents,msg = self.session.feed_identities(msg)
72 idents,msg = self.session.feed_identities(msg)
66 msg = Message(self.session.unpack_message(msg))
73 msg = Message(self.session.unpack_message(msg))
67
74
68 if msg.content.status == 'ok':
75 if msg.content.status == 'ok':
69 self.id = int(msg.content.id)
76 self.id = int(msg.content.id)
70
77
71 # create Shell Streams (MUX, Task, etc.):
78 # create Shell Streams (MUX, Task, etc.):
72 queue_addr = msg.content.mux
79 queue_addr = msg.content.mux
73 shell_addrs = [ str(queue_addr) ]
80 shell_addrs = [ str(queue_addr) ]
74 task_addr = msg.content.task
81 task_addr = msg.content.task
75 if task_addr:
82 if task_addr:
76 shell_addrs.append(str(task_addr))
83 shell_addrs.append(str(task_addr))
77
84
78 # Uncomment this to go back to two-socket model
85 # Uncomment this to go back to two-socket model
79 # shell_streams = []
86 # shell_streams = []
80 # for addr in shell_addrs:
87 # for addr in shell_addrs:
81 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
88 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
82 # stream.setsockopt(zmq.IDENTITY, identity)
89 # stream.setsockopt(zmq.IDENTITY, identity)
83 # stream.connect(disambiguate_url(addr, self.location))
90 # stream.connect(disambiguate_url(addr, self.location))
84 # shell_streams.append(stream)
91 # shell_streams.append(stream)
85
92
86 # Now use only one shell stream for mux and tasks
93 # Now use only one shell stream for mux and tasks
87 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
94 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
88 stream.setsockopt(zmq.IDENTITY, identity)
95 stream.setsockopt(zmq.IDENTITY, identity)
89 shell_streams = [stream]
96 shell_streams = [stream]
90 for addr in shell_addrs:
97 for addr in shell_addrs:
91 stream.connect(disambiguate_url(addr, self.location))
98 stream.connect(disambiguate_url(addr, self.location))
92 # end single stream-socket
99 # end single stream-socket
93
100
94 # control stream:
101 # control stream:
95 control_addr = str(msg.content.control)
102 control_addr = str(msg.content.control)
96 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
103 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
97 control_stream.setsockopt(zmq.IDENTITY, identity)
104 control_stream.setsockopt(zmq.IDENTITY, identity)
98 control_stream.connect(disambiguate_url(control_addr, self.location))
105 control_stream.connect(disambiguate_url(control_addr, self.location))
99
106
100 # create iopub stream:
107 # create iopub stream:
101 iopub_addr = msg.content.iopub
108 iopub_addr = msg.content.iopub
102 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
109 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
103 iopub_stream.setsockopt(zmq.IDENTITY, identity)
110 iopub_stream.setsockopt(zmq.IDENTITY, identity)
104 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
111 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
105
112
106 # launch heartbeat
113 # launch heartbeat
107 hb_addrs = msg.content.heartbeat
114 hb_addrs = msg.content.heartbeat
108 # print (hb_addrs)
115 # print (hb_addrs)
109
116
110 # # Redirect input streams and set a display hook.
117 # # Redirect input streams and set a display hook.
111 if self.out_stream_factory:
118 if self.out_stream_factory:
112 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
119 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
113 sys.stdout.topic = 'engine.%i.stdout'%self.id
120 sys.stdout.topic = 'engine.%i.stdout'%self.id
114 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
121 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
115 sys.stderr.topic = 'engine.%i.stderr'%self.id
122 sys.stderr.topic = 'engine.%i.stderr'%self.id
116 if self.display_hook_factory:
123 if self.display_hook_factory:
117 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
124 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
118 sys.displayhook.topic = 'engine.%i.pyout'%self.id
125 sys.displayhook.topic = 'engine.%i.pyout'%self.id
119
126
120 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
127 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
121 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
128 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
122 loop=loop, user_ns = self.user_ns, logname=self.log.name)
129 loop=loop, user_ns = self.user_ns, logname=self.log.name)
123 self.kernel.start()
130 self.kernel.start()
124 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
131 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
125 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
132 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
126 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
133 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
127 heart.start()
134 heart.start()
128
135
129
136
130 else:
137 else:
131 self.log.fatal("Registration Failed: %s"%msg)
138 self.log.fatal("Registration Failed: %s"%msg)
132 raise Exception("Registration Failed: %s"%msg)
139 raise Exception("Registration Failed: %s"%msg)
133
140
134 self.log.info("Completed registration with id %i"%self.id)
141 self.log.info("Completed registration with id %i"%self.id)
135
142
136
143
137 def abort(self):
144 def abort(self):
138 self.log.fatal("Registration timed out")
145 self.log.fatal("Registration timed out")
139 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
146 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
140 time.sleep(1)
147 time.sleep(1)
141 sys.exit(255)
148 sys.exit(255)
142
149
143 def start(self):
150 def start(self):
144 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
151 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
145 dc.start()
152 dc.start()
146 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
153 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
147 self._abort_dc.start()
154 self._abort_dc.start()
148
155
@@ -1,119 +1,125 b''
1 """ Defines helper functions for creating kernel entry points and process
1 """ Defines helper functions for creating kernel entry points and process
2 launchers.
2 launchers.
3
3
4 ************
4 ************
5 NOTE: Most of this module has been deprecated by moving to Configurables
5 NOTE: Most of this module has been deprecated by moving to Configurables
6 ************
6 ************
7 """
7 """
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2010-2011 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
8
14
9 # Standard library imports.
15 # Standard library imports.
10 import atexit
16 import atexit
11 import logging
17 import logging
12 import os
18 import os
13 import stat
19 import stat
14 import socket
20 import socket
15 import sys
21 import sys
16 from signal import signal, SIGINT, SIGABRT, SIGTERM
22 from signal import signal, SIGINT, SIGABRT, SIGTERM
17 from subprocess import Popen, PIPE
23 from subprocess import Popen, PIPE
18 try:
24 try:
19 from signal import SIGKILL
25 from signal import SIGKILL
20 except ImportError:
26 except ImportError:
21 SIGKILL=None
27 SIGKILL=None
22
28
23 # System library imports.
29 # System library imports.
24 import zmq
30 import zmq
25 from zmq.log import handlers
31 from zmq.log import handlers
26
32
27 # Local imports.
33 # Local imports.
28 from IPython.core.ultratb import FormattedTB
34 from IPython.core.ultratb import FormattedTB
29 from IPython.external.argparse import ArgumentParser
35 from IPython.external.argparse import ArgumentParser
30 from IPython.zmq.log import EnginePUBHandler
36 from IPython.zmq.log import EnginePUBHandler
31
37
32 _random_ports = set()
38 _random_ports = set()
33
39
34 def select_random_ports(n):
40 def select_random_ports(n):
35 """Selects and return n random ports that are available."""
41 """Selects and return n random ports that are available."""
36 ports = []
42 ports = []
37 for i in xrange(n):
43 for i in xrange(n):
38 sock = socket.socket()
44 sock = socket.socket()
39 sock.bind(('', 0))
45 sock.bind(('', 0))
40 while sock.getsockname()[1] in _random_ports:
46 while sock.getsockname()[1] in _random_ports:
41 sock.close()
47 sock.close()
42 sock = socket.socket()
48 sock = socket.socket()
43 sock.bind(('', 0))
49 sock.bind(('', 0))
44 ports.append(sock)
50 ports.append(sock)
45 for i, sock in enumerate(ports):
51 for i, sock in enumerate(ports):
46 port = sock.getsockname()[1]
52 port = sock.getsockname()[1]
47 sock.close()
53 sock.close()
48 ports[i] = port
54 ports[i] = port
49 _random_ports.add(port)
55 _random_ports.add(port)
50 return ports
56 return ports
51
57
52 def signal_children(children):
58 def signal_children(children):
53 """Relay interupt/term signals to children, for more solid process cleanup."""
59 """Relay interupt/term signals to children, for more solid process cleanup."""
54 def terminate_children(sig, frame):
60 def terminate_children(sig, frame):
55 logging.critical("Got signal %i, terminating children..."%sig)
61 logging.critical("Got signal %i, terminating children..."%sig)
56 for child in children:
62 for child in children:
57 child.terminate()
63 child.terminate()
58
64
59 sys.exit(sig != SIGINT)
65 sys.exit(sig != SIGINT)
60 # sys.exit(sig)
66 # sys.exit(sig)
61 for sig in (SIGINT, SIGABRT, SIGTERM):
67 for sig in (SIGINT, SIGABRT, SIGTERM):
62 signal(sig, terminate_children)
68 signal(sig, terminate_children)
63
69
64 def generate_exec_key(keyfile):
70 def generate_exec_key(keyfile):
65 import uuid
71 import uuid
66 newkey = str(uuid.uuid4())
72 newkey = str(uuid.uuid4())
67 with open(keyfile, 'w') as f:
73 with open(keyfile, 'w') as f:
68 # f.write('ipython-key ')
74 # f.write('ipython-key ')
69 f.write(newkey+'\n')
75 f.write(newkey+'\n')
70 # set user-only RW permissions (0600)
76 # set user-only RW permissions (0600)
71 # this will have no effect on Windows
77 # this will have no effect on Windows
72 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
78 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
73
79
74
80
75 def integer_loglevel(loglevel):
81 def integer_loglevel(loglevel):
76 try:
82 try:
77 loglevel = int(loglevel)
83 loglevel = int(loglevel)
78 except ValueError:
84 except ValueError:
79 if isinstance(loglevel, str):
85 if isinstance(loglevel, str):
80 loglevel = getattr(logging, loglevel)
86 loglevel = getattr(logging, loglevel)
81 return loglevel
87 return loglevel
82
88
83 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
89 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
84 logger = logging.getLogger(logname)
90 logger = logging.getLogger(logname)
85 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
91 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
86 # don't add a second PUBHandler
92 # don't add a second PUBHandler
87 return
93 return
88 loglevel = integer_loglevel(loglevel)
94 loglevel = integer_loglevel(loglevel)
89 lsock = context.socket(zmq.PUB)
95 lsock = context.socket(zmq.PUB)
90 lsock.connect(iface)
96 lsock.connect(iface)
91 handler = handlers.PUBHandler(lsock)
97 handler = handlers.PUBHandler(lsock)
92 handler.setLevel(loglevel)
98 handler.setLevel(loglevel)
93 handler.root_topic = root
99 handler.root_topic = root
94 logger.addHandler(handler)
100 logger.addHandler(handler)
95 logger.setLevel(loglevel)
101 logger.setLevel(loglevel)
96
102
97 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
103 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
98 logger = logging.getLogger()
104 logger = logging.getLogger()
99 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
105 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
100 # don't add a second PUBHandler
106 # don't add a second PUBHandler
101 return
107 return
102 loglevel = integer_loglevel(loglevel)
108 loglevel = integer_loglevel(loglevel)
103 lsock = context.socket(zmq.PUB)
109 lsock = context.socket(zmq.PUB)
104 lsock.connect(iface)
110 lsock.connect(iface)
105 handler = EnginePUBHandler(engine, lsock)
111 handler = EnginePUBHandler(engine, lsock)
106 handler.setLevel(loglevel)
112 handler.setLevel(loglevel)
107 logger.addHandler(handler)
113 logger.addHandler(handler)
108 logger.setLevel(loglevel)
114 logger.setLevel(loglevel)
109
115
110 def local_logger(logname, loglevel=logging.DEBUG):
116 def local_logger(logname, loglevel=logging.DEBUG):
111 loglevel = integer_loglevel(loglevel)
117 loglevel = integer_loglevel(loglevel)
112 logger = logging.getLogger(logname)
118 logger = logging.getLogger(logname)
113 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
119 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
114 # don't add a second StreamHandler
120 # don't add a second StreamHandler
115 return
121 return
116 handler = logging.StreamHandler()
122 handler = logging.StreamHandler()
117 handler.setLevel(loglevel)
123 handler.setLevel(loglevel)
118 logger.addHandler(handler)
124 logger.addHandler(handler)
119 logger.setLevel(loglevel)
125 logger.setLevel(loglevel)
@@ -1,158 +1,164 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5 """
5 """
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010-2011 The IPython Development Team
8 #
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
6
12
7 from __future__ import print_function
13 from __future__ import print_function
8 import time
14 import time
9 import logging
15 import logging
10 import uuid
16 import uuid
11
17
12 import zmq
18 import zmq
13 from zmq.devices import ProcessDevice,ThreadDevice
19 from zmq.devices import ProcessDevice,ThreadDevice
14 from zmq.eventloop import ioloop, zmqstream
20 from zmq.eventloop import ioloop, zmqstream
15
21
16 from IPython.utils.traitlets import Set, Instance, CFloat, Bool
22 from IPython.utils.traitlets import Set, Instance, CFloat, Bool
17 from .factory import LoggingFactory
23 from .factory import LoggingFactory
18
24
19 class Heart(object):
25 class Heart(object):
20 """A basic heart object for responding to a HeartMonitor.
26 """A basic heart object for responding to a HeartMonitor.
21 This is a simple wrapper with defaults for the most common
27 This is a simple wrapper with defaults for the most common
22 Device model for responding to heartbeats.
28 Device model for responding to heartbeats.
23
29
24 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
30 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
25 SUB/XREQ for in/out.
31 SUB/XREQ for in/out.
26
32
27 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
33 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
28 device=None
34 device=None
29 id=None
35 id=None
30 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
36 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
31 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
37 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
32 self.device.daemon=True
38 self.device.daemon=True
33 self.device.connect_in(in_addr)
39 self.device.connect_in(in_addr)
34 self.device.connect_out(out_addr)
40 self.device.connect_out(out_addr)
35 if in_type == zmq.SUB:
41 if in_type == zmq.SUB:
36 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
42 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
37 if heart_id is None:
43 if heart_id is None:
38 heart_id = str(uuid.uuid4())
44 heart_id = str(uuid.uuid4())
39 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
45 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
40 self.id = heart_id
46 self.id = heart_id
41
47
42 def start(self):
48 def start(self):
43 return self.device.start()
49 return self.device.start()
44
50
45 class HeartMonitor(LoggingFactory):
51 class HeartMonitor(LoggingFactory):
46 """A basic HeartMonitor class
52 """A basic HeartMonitor class
47 pingstream: a PUB stream
53 pingstream: a PUB stream
48 pongstream: an XREP stream
54 pongstream: an XREP stream
49 period: the period of the heartbeat in milliseconds"""
55 period: the period of the heartbeat in milliseconds"""
50
56
51 period=CFloat(1000, config=True) # in milliseconds
57 period=CFloat(1000, config=True) # in milliseconds
52
58
53 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
59 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
54 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
60 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
55 loop = Instance('zmq.eventloop.ioloop.IOLoop')
61 loop = Instance('zmq.eventloop.ioloop.IOLoop')
56 def _loop_default(self):
62 def _loop_default(self):
57 return ioloop.IOLoop.instance()
63 return ioloop.IOLoop.instance()
58 debug=Bool(False)
64 debug=Bool(False)
59
65
60 # not settable:
66 # not settable:
61 hearts=Set()
67 hearts=Set()
62 responses=Set()
68 responses=Set()
63 on_probation=Set()
69 on_probation=Set()
64 last_ping=CFloat(0)
70 last_ping=CFloat(0)
65 _new_handlers = Set()
71 _new_handlers = Set()
66 _failure_handlers = Set()
72 _failure_handlers = Set()
67 lifetime = CFloat(0)
73 lifetime = CFloat(0)
68 tic = CFloat(0)
74 tic = CFloat(0)
69
75
70 def __init__(self, **kwargs):
76 def __init__(self, **kwargs):
71 super(HeartMonitor, self).__init__(**kwargs)
77 super(HeartMonitor, self).__init__(**kwargs)
72
78
73 self.pongstream.on_recv(self.handle_pong)
79 self.pongstream.on_recv(self.handle_pong)
74
80
75 def start(self):
81 def start(self):
76 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
82 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
77 self.caller.start()
83 self.caller.start()
78
84
79 def add_new_heart_handler(self, handler):
85 def add_new_heart_handler(self, handler):
80 """add a new handler for new hearts"""
86 """add a new handler for new hearts"""
81 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
87 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
82 self._new_handlers.add(handler)
88 self._new_handlers.add(handler)
83
89
84 def add_heart_failure_handler(self, handler):
90 def add_heart_failure_handler(self, handler):
85 """add a new handler for heart failure"""
91 """add a new handler for heart failure"""
86 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
92 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
87 self._failure_handlers.add(handler)
93 self._failure_handlers.add(handler)
88
94
89 def beat(self):
95 def beat(self):
90 self.pongstream.flush()
96 self.pongstream.flush()
91 self.last_ping = self.lifetime
97 self.last_ping = self.lifetime
92
98
93 toc = time.time()
99 toc = time.time()
94 self.lifetime += toc-self.tic
100 self.lifetime += toc-self.tic
95 self.tic = toc
101 self.tic = toc
96 # self.log.debug("heartbeat::%s"%self.lifetime)
102 # self.log.debug("heartbeat::%s"%self.lifetime)
97 goodhearts = self.hearts.intersection(self.responses)
103 goodhearts = self.hearts.intersection(self.responses)
98 missed_beats = self.hearts.difference(goodhearts)
104 missed_beats = self.hearts.difference(goodhearts)
99 heartfailures = self.on_probation.intersection(missed_beats)
105 heartfailures = self.on_probation.intersection(missed_beats)
100 newhearts = self.responses.difference(goodhearts)
106 newhearts = self.responses.difference(goodhearts)
101 map(self.handle_new_heart, newhearts)
107 map(self.handle_new_heart, newhearts)
102 map(self.handle_heart_failure, heartfailures)
108 map(self.handle_heart_failure, heartfailures)
103 self.on_probation = missed_beats.intersection(self.hearts)
109 self.on_probation = missed_beats.intersection(self.hearts)
104 self.responses = set()
110 self.responses = set()
105 # print self.on_probation, self.hearts
111 # print self.on_probation, self.hearts
106 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
112 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
107 self.pingstream.send(str(self.lifetime))
113 self.pingstream.send(str(self.lifetime))
108
114
109 def handle_new_heart(self, heart):
115 def handle_new_heart(self, heart):
110 if self._new_handlers:
116 if self._new_handlers:
111 for handler in self._new_handlers:
117 for handler in self._new_handlers:
112 handler(heart)
118 handler(heart)
113 else:
119 else:
114 self.log.info("heartbeat::yay, got new heart %s!"%heart)
120 self.log.info("heartbeat::yay, got new heart %s!"%heart)
115 self.hearts.add(heart)
121 self.hearts.add(heart)
116
122
117 def handle_heart_failure(self, heart):
123 def handle_heart_failure(self, heart):
118 if self._failure_handlers:
124 if self._failure_handlers:
119 for handler in self._failure_handlers:
125 for handler in self._failure_handlers:
120 try:
126 try:
121 handler(heart)
127 handler(heart)
122 except Exception as e:
128 except Exception as e:
123 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
129 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
124 pass
130 pass
125 else:
131 else:
126 self.log.info("heartbeat::Heart %s failed :("%heart)
132 self.log.info("heartbeat::Heart %s failed :("%heart)
127 self.hearts.remove(heart)
133 self.hearts.remove(heart)
128
134
129
135
130 def handle_pong(self, msg):
136 def handle_pong(self, msg):
131 "a heart just beat"
137 "a heart just beat"
132 if msg[1] == str(self.lifetime):
138 if msg[1] == str(self.lifetime):
133 delta = time.time()-self.tic
139 delta = time.time()-self.tic
134 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
140 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
135 self.responses.add(msg[0])
141 self.responses.add(msg[0])
136 elif msg[1] == str(self.last_ping):
142 elif msg[1] == str(self.last_ping):
137 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
143 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
138 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
144 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
139 self.responses.add(msg[0])
145 self.responses.add(msg[0])
140 else:
146 else:
141 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
147 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
142 (msg[1],self.lifetime))
148 (msg[1],self.lifetime))
143
149
144
150
145 if __name__ == '__main__':
151 if __name__ == '__main__':
146 loop = ioloop.IOLoop.instance()
152 loop = ioloop.IOLoop.instance()
147 context = zmq.Context()
153 context = zmq.Context()
148 pub = context.socket(zmq.PUB)
154 pub = context.socket(zmq.PUB)
149 pub.bind('tcp://127.0.0.1:5555')
155 pub.bind('tcp://127.0.0.1:5555')
150 xrep = context.socket(zmq.XREP)
156 xrep = context.socket(zmq.XREP)
151 xrep.bind('tcp://127.0.0.1:5556')
157 xrep.bind('tcp://127.0.0.1:5556')
152
158
153 outstream = zmqstream.ZMQStream(pub, loop)
159 outstream = zmqstream.ZMQStream(pub, loop)
154 instream = zmqstream.ZMQStream(xrep, loop)
160 instream = zmqstream.ZMQStream(xrep, loop)
155
161
156 hb = HeartMonitor(loop, outstream, instream)
162 hb = HeartMonitor(loop, outstream, instream)
157
163
158 loop.start()
164 loop.start()
@@ -1,90 +1,97 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Old ipcluster script. Possibly to be removed."""
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2010-2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
2 from __future__ import print_function
9 from __future__ import print_function
3
10
4 import os
11 import os
5 import sys
12 import sys
6 import time
13 import time
7 from subprocess import Popen, PIPE
14 from subprocess import Popen, PIPE
8
15
9 from IPython.external.argparse import ArgumentParser, SUPPRESS
16 from IPython.external.argparse import ArgumentParser, SUPPRESS
10
17
11 def _filter_arg(flag, args):
18 def _filter_arg(flag, args):
12 filtered = []
19 filtered = []
13 if flag in args:
20 if flag in args:
14 filtered.append(flag)
21 filtered.append(flag)
15 idx = args.index(flag)
22 idx = args.index(flag)
16 if len(args) > idx+1:
23 if len(args) > idx+1:
17 if not args[idx+1].startswith('-'):
24 if not args[idx+1].startswith('-'):
18 filtered.append(args[idx+1])
25 filtered.append(args[idx+1])
19 return filtered
26 return filtered
20
27
21 def filter_args(flags, args=sys.argv[1:]):
28 def filter_args(flags, args=sys.argv[1:]):
22 filtered = []
29 filtered = []
23 for flag in flags:
30 for flag in flags:
24 if isinstance(flag, (list,tuple)):
31 if isinstance(flag, (list,tuple)):
25 for f in flag:
32 for f in flag:
26 filtered.extend(_filter_arg(f, args))
33 filtered.extend(_filter_arg(f, args))
27 else:
34 else:
28 filtered.extend(_filter_arg(flag, args))
35 filtered.extend(_filter_arg(flag, args))
29 return filtered
36 return filtered
30
37
31 def _strip_arg(flag, args):
38 def _strip_arg(flag, args):
32 while flag in args:
39 while flag in args:
33 idx = args.index(flag)
40 idx = args.index(flag)
34 args.pop(idx)
41 args.pop(idx)
35 if len(args) > idx:
42 if len(args) > idx:
36 if not args[idx].startswith('-'):
43 if not args[idx].startswith('-'):
37 args.pop(idx)
44 args.pop(idx)
38
45
39 def strip_args(flags, args=sys.argv[1:]):
46 def strip_args(flags, args=sys.argv[1:]):
40 args = list(args)
47 args = list(args)
41 for flag in flags:
48 for flag in flags:
42 if isinstance(flag, (list,tuple)):
49 if isinstance(flag, (list,tuple)):
43 for f in flag:
50 for f in flag:
44 _strip_arg(f, args)
51 _strip_arg(f, args)
45 else:
52 else:
46 _strip_arg(flag, args)
53 _strip_arg(flag, args)
47 return args
54 return args
48
55
49
56
50 def launch_process(mod, args):
57 def launch_process(mod, args):
51 """Launch a controller or engine in a subprocess."""
58 """Launch a controller or engine in a subprocess."""
52 code = "from IPython.zmq.parallel.%s import launch_new_instance;launch_new_instance()"%mod
59 code = "from IPython.zmq.parallel.%s import launch_new_instance;launch_new_instance()"%mod
53 arguments = [ sys.executable, '-c', code ] + args
60 arguments = [ sys.executable, '-c', code ] + args
54 blackholew = file(os.devnull, 'w')
61 blackholew = file(os.devnull, 'w')
55 blackholer = file(os.devnull, 'r')
62 blackholer = file(os.devnull, 'r')
56
63
57 proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=PIPE)
64 proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=PIPE)
58 return proc
65 return proc
59
66
60 def main():
67 def main():
61 parser = ArgumentParser(argument_default=SUPPRESS)
68 parser = ArgumentParser(argument_default=SUPPRESS)
62 parser.add_argument('--n', '-n', type=int, default=1,
69 parser.add_argument('--n', '-n', type=int, default=1,
63 help="The number of engines to start.")
70 help="The number of engines to start.")
64 ns,args = parser.parse_known_args()
71 ns,args = parser.parse_known_args()
65 n = ns.n
72 n = ns.n
66
73
67 controller = launch_process('ipcontrollerapp', args)
74 controller = launch_process('ipcontrollerapp', args)
68 for i in range(10):
75 for i in range(10):
69 time.sleep(.1)
76 time.sleep(.1)
70 if controller.poll() is not None:
77 if controller.poll() is not None:
71 print("Controller failed to launch:")
78 print("Controller failed to launch:")
72 print (controller.stderr.read())
79 print (controller.stderr.read())
73 sys.exit(255)
80 sys.exit(255)
74
81
75 print("Launched Controller")
82 print("Launched Controller")
76 engines = [ launch_process('ipengineapp', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
83 engines = [ launch_process('ipengineapp', args+['--ident', 'engine-%i'%i]) for i in range(n) ]
77 print("%i Engines started"%n)
84 print("%i Engines started"%n)
78
85
79 def wait_quietly(p):
86 def wait_quietly(p):
80 try:
87 try:
81 p.wait()
88 p.wait()
82 except KeyboardInterrupt:
89 except KeyboardInterrupt:
83 pass
90 pass
84
91
85 wait_quietly(controller)
92 wait_quietly(controller)
86 map(wait_quietly, engines)
93 map(wait_quietly, engines)
87 print ("Engines cleaned up.")
94 print ("Engines cleaned up.")
88
95
89 if __name__ == '__main__':
96 if __name__ == '__main__':
90 main() No newline at end of file
97 main()
@@ -1,219 +1,225 b''
1 """KernelStarter class that intercepts Control Queue messages, and handles process management."""
1 """KernelStarter class that intercepts Control Queue messages, and handles process management."""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010-2011 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
2
8
3 from zmq.eventloop import ioloop
9 from zmq.eventloop import ioloop
4
10
5 from .streamsession import StreamSession
11 from .streamsession import StreamSession
6
12
7 class KernelStarter(object):
13 class KernelStarter(object):
8 """Object for resetting/killing the Kernel."""
14 """Object for resetting/killing the Kernel."""
9
15
10
16
11 def __init__(self, session, upstream, downstream, *kernel_args, **kernel_kwargs):
17 def __init__(self, session, upstream, downstream, *kernel_args, **kernel_kwargs):
12 self.session = session
18 self.session = session
13 self.upstream = upstream
19 self.upstream = upstream
14 self.downstream = downstream
20 self.downstream = downstream
15 self.kernel_args = kernel_args
21 self.kernel_args = kernel_args
16 self.kernel_kwargs = kernel_kwargs
22 self.kernel_kwargs = kernel_kwargs
17 self.handlers = {}
23 self.handlers = {}
18 for method in 'shutdown_request shutdown_reply'.split():
24 for method in 'shutdown_request shutdown_reply'.split():
19 self.handlers[method] = getattr(self, method)
25 self.handlers[method] = getattr(self, method)
20
26
21 def start(self):
27 def start(self):
22 self.upstream.on_recv(self.dispatch_request)
28 self.upstream.on_recv(self.dispatch_request)
23 self.downstream.on_recv(self.dispatch_reply)
29 self.downstream.on_recv(self.dispatch_reply)
24
30
25 #--------------------------------------------------------------------------
31 #--------------------------------------------------------------------------
26 # Dispatch methods
32 # Dispatch methods
27 #--------------------------------------------------------------------------
33 #--------------------------------------------------------------------------
28
34
29 def dispatch_request(self, raw_msg):
35 def dispatch_request(self, raw_msg):
30 idents, msg = self.session.feed_identities()
36 idents, msg = self.session.feed_identities()
31 try:
37 try:
32 msg = self.session.unpack_message(msg, content=False)
38 msg = self.session.unpack_message(msg, content=False)
33 except:
39 except:
34 print ("bad msg: %s"%msg)
40 print ("bad msg: %s"%msg)
35
41
36 msgtype = msg['msg_type']
42 msgtype = msg['msg_type']
37 handler = self.handlers.get(msgtype, None)
43 handler = self.handlers.get(msgtype, None)
38 if handler is None:
44 if handler is None:
39 self.downstream.send_multipart(raw_msg, copy=False)
45 self.downstream.send_multipart(raw_msg, copy=False)
40 else:
46 else:
41 handler(msg)
47 handler(msg)
42
48
43 def dispatch_reply(self, raw_msg):
49 def dispatch_reply(self, raw_msg):
44 idents, msg = self.session.feed_identities()
50 idents, msg = self.session.feed_identities()
45 try:
51 try:
46 msg = self.session.unpack_message(msg, content=False)
52 msg = self.session.unpack_message(msg, content=False)
47 except:
53 except:
48 print ("bad msg: %s"%msg)
54 print ("bad msg: %s"%msg)
49
55
50 msgtype = msg['msg_type']
56 msgtype = msg['msg_type']
51 handler = self.handlers.get(msgtype, None)
57 handler = self.handlers.get(msgtype, None)
52 if handler is None:
58 if handler is None:
53 self.upstream.send_multipart(raw_msg, copy=False)
59 self.upstream.send_multipart(raw_msg, copy=False)
54 else:
60 else:
55 handler(msg)
61 handler(msg)
56
62
57 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
58 # Handlers
64 # Handlers
59 #--------------------------------------------------------------------------
65 #--------------------------------------------------------------------------
60
66
61 def shutdown_request(self, msg):
67 def shutdown_request(self, msg):
62 """"""
68 """"""
63 self.downstream.send_multipart(msg)
69 self.downstream.send_multipart(msg)
64
70
65 #--------------------------------------------------------------------------
71 #--------------------------------------------------------------------------
66 # Kernel process management methods, from KernelManager:
72 # Kernel process management methods, from KernelManager:
67 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
68
74
69 def _check_local(addr):
75 def _check_local(addr):
70 if isinstance(addr, tuple):
76 if isinstance(addr, tuple):
71 addr = addr[0]
77 addr = addr[0]
72 return addr in LOCAL_IPS
78 return addr in LOCAL_IPS
73
79
74 def start_kernel(self, **kw):
80 def start_kernel(self, **kw):
75 """Starts a kernel process and configures the manager to use it.
81 """Starts a kernel process and configures the manager to use it.
76
82
77 If random ports (port=0) are being used, this method must be called
83 If random ports (port=0) are being used, this method must be called
78 before the channels are created.
84 before the channels are created.
79
85
80 Parameters:
86 Parameters:
81 -----------
87 -----------
82 ipython : bool, optional (default True)
88 ipython : bool, optional (default True)
83 Whether to use an IPython kernel instead of a plain Python kernel.
89 Whether to use an IPython kernel instead of a plain Python kernel.
84 """
90 """
85 self.kernel = Process(target=make_kernel, args=self.kernel_args,
91 self.kernel = Process(target=make_kernel, args=self.kernel_args,
86 kwargs=self.kernel_kwargs)
92 kwargs=self.kernel_kwargs)
87
93
88 def shutdown_kernel(self, restart=False):
94 def shutdown_kernel(self, restart=False):
89 """ Attempts to the stop the kernel process cleanly. If the kernel
95 """ Attempts to the stop the kernel process cleanly. If the kernel
90 cannot be stopped, it is killed, if possible.
96 cannot be stopped, it is killed, if possible.
91 """
97 """
92 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
98 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
93 if sys.platform == 'win32':
99 if sys.platform == 'win32':
94 self.kill_kernel()
100 self.kill_kernel()
95 return
101 return
96
102
97 # Don't send any additional kernel kill messages immediately, to give
103 # Don't send any additional kernel kill messages immediately, to give
98 # the kernel a chance to properly execute shutdown actions. Wait for at
104 # the kernel a chance to properly execute shutdown actions. Wait for at
99 # most 1s, checking every 0.1s.
105 # most 1s, checking every 0.1s.
100 self.xreq_channel.shutdown(restart=restart)
106 self.xreq_channel.shutdown(restart=restart)
101 for i in range(10):
107 for i in range(10):
102 if self.is_alive:
108 if self.is_alive:
103 time.sleep(0.1)
109 time.sleep(0.1)
104 else:
110 else:
105 break
111 break
106 else:
112 else:
107 # OK, we've waited long enough.
113 # OK, we've waited long enough.
108 if self.has_kernel:
114 if self.has_kernel:
109 self.kill_kernel()
115 self.kill_kernel()
110
116
111 def restart_kernel(self, now=False):
117 def restart_kernel(self, now=False):
112 """Restarts a kernel with the same arguments that were used to launch
118 """Restarts a kernel with the same arguments that were used to launch
113 it. If the old kernel was launched with random ports, the same ports
119 it. If the old kernel was launched with random ports, the same ports
114 will be used for the new kernel.
120 will be used for the new kernel.
115
121
116 Parameters
122 Parameters
117 ----------
123 ----------
118 now : bool, optional
124 now : bool, optional
119 If True, the kernel is forcefully restarted *immediately*, without
125 If True, the kernel is forcefully restarted *immediately*, without
120 having a chance to do any cleanup action. Otherwise the kernel is
126 having a chance to do any cleanup action. Otherwise the kernel is
121 given 1s to clean up before a forceful restart is issued.
127 given 1s to clean up before a forceful restart is issued.
122
128
123 In all cases the kernel is restarted, the only difference is whether
129 In all cases the kernel is restarted, the only difference is whether
124 it is given a chance to perform a clean shutdown or not.
130 it is given a chance to perform a clean shutdown or not.
125 """
131 """
126 if self._launch_args is None:
132 if self._launch_args is None:
127 raise RuntimeError("Cannot restart the kernel. "
133 raise RuntimeError("Cannot restart the kernel. "
128 "No previous call to 'start_kernel'.")
134 "No previous call to 'start_kernel'.")
129 else:
135 else:
130 if self.has_kernel:
136 if self.has_kernel:
131 if now:
137 if now:
132 self.kill_kernel()
138 self.kill_kernel()
133 else:
139 else:
134 self.shutdown_kernel(restart=True)
140 self.shutdown_kernel(restart=True)
135 self.start_kernel(**self._launch_args)
141 self.start_kernel(**self._launch_args)
136
142
137 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
143 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
138 # unless there is some delay here.
144 # unless there is some delay here.
139 if sys.platform == 'win32':
145 if sys.platform == 'win32':
140 time.sleep(0.2)
146 time.sleep(0.2)
141
147
142 @property
148 @property
143 def has_kernel(self):
149 def has_kernel(self):
144 """Returns whether a kernel process has been specified for the kernel
150 """Returns whether a kernel process has been specified for the kernel
145 manager.
151 manager.
146 """
152 """
147 return self.kernel is not None
153 return self.kernel is not None
148
154
149 def kill_kernel(self):
155 def kill_kernel(self):
150 """ Kill the running kernel. """
156 """ Kill the running kernel. """
151 if self.has_kernel:
157 if self.has_kernel:
152 # Pause the heart beat channel if it exists.
158 # Pause the heart beat channel if it exists.
153 if self._hb_channel is not None:
159 if self._hb_channel is not None:
154 self._hb_channel.pause()
160 self._hb_channel.pause()
155
161
156 # Attempt to kill the kernel.
162 # Attempt to kill the kernel.
157 try:
163 try:
158 self.kernel.kill()
164 self.kernel.kill()
159 except OSError, e:
165 except OSError, e:
160 # In Windows, we will get an Access Denied error if the process
166 # In Windows, we will get an Access Denied error if the process
161 # has already terminated. Ignore it.
167 # has already terminated. Ignore it.
162 if not (sys.platform == 'win32' and e.winerror == 5):
168 if not (sys.platform == 'win32' and e.winerror == 5):
163 raise
169 raise
164 self.kernel = None
170 self.kernel = None
165 else:
171 else:
166 raise RuntimeError("Cannot kill kernel. No kernel is running!")
172 raise RuntimeError("Cannot kill kernel. No kernel is running!")
167
173
168 def interrupt_kernel(self):
174 def interrupt_kernel(self):
169 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
175 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
170 well supported on all platforms.
176 well supported on all platforms.
171 """
177 """
172 if self.has_kernel:
178 if self.has_kernel:
173 if sys.platform == 'win32':
179 if sys.platform == 'win32':
174 from parentpoller import ParentPollerWindows as Poller
180 from parentpoller import ParentPollerWindows as Poller
175 Poller.send_interrupt(self.kernel.win32_interrupt_event)
181 Poller.send_interrupt(self.kernel.win32_interrupt_event)
176 else:
182 else:
177 self.kernel.send_signal(signal.SIGINT)
183 self.kernel.send_signal(signal.SIGINT)
178 else:
184 else:
179 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
185 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
180
186
181 def signal_kernel(self, signum):
187 def signal_kernel(self, signum):
182 """ Sends a signal to the kernel. Note that since only SIGTERM is
188 """ Sends a signal to the kernel. Note that since only SIGTERM is
183 supported on Windows, this function is only useful on Unix systems.
189 supported on Windows, this function is only useful on Unix systems.
184 """
190 """
185 if self.has_kernel:
191 if self.has_kernel:
186 self.kernel.send_signal(signum)
192 self.kernel.send_signal(signum)
187 else:
193 else:
188 raise RuntimeError("Cannot signal kernel. No kernel is running!")
194 raise RuntimeError("Cannot signal kernel. No kernel is running!")
189
195
190 @property
196 @property
191 def is_alive(self):
197 def is_alive(self):
192 """Is the kernel process still running?"""
198 """Is the kernel process still running?"""
193 # FIXME: not using a heartbeat means this method is broken for any
199 # FIXME: not using a heartbeat means this method is broken for any
194 # remote kernel, it's only capable of handling local kernels.
200 # remote kernel, it's only capable of handling local kernels.
195 if self.has_kernel:
201 if self.has_kernel:
196 if self.kernel.poll() is None:
202 if self.kernel.poll() is None:
197 return True
203 return True
198 else:
204 else:
199 return False
205 return False
200 else:
206 else:
201 # We didn't start the kernel with this KernelManager so we don't
207 # We didn't start the kernel with this KernelManager so we don't
202 # know if it is running. We should use a heartbeat for this case.
208 # know if it is running. We should use a heartbeat for this case.
203 return True
209 return True
204
210
205
211
206 def make_starter(up_addr, down_addr, *args, **kwargs):
212 def make_starter(up_addr, down_addr, *args, **kwargs):
207 """entry point function for launching a kernelstarter in a subprocess"""
213 """entry point function for launching a kernelstarter in a subprocess"""
208 loop = ioloop.IOLoop.instance()
214 loop = ioloop.IOLoop.instance()
209 ctx = zmq.Context()
215 ctx = zmq.Context()
210 session = StreamSession()
216 session = StreamSession()
211 upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
217 upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
212 upstream.connect(up_addr)
218 upstream.connect(up_addr)
213 downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
219 downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
214 downstream.connect(down_addr)
220 downstream.connect(down_addr)
215
221
216 starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
222 starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
217 starter.start()
223 starter.start()
218 loop.start()
224 loop.start()
219 No newline at end of file
225
@@ -1,95 +1,101 b''
1 """RemoteNamespace object, for dict style interaction with a remote
1 """RemoteNamespace object, for dict style interaction with a remote
2 execution kernel."""
2 execution kernel."""
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2010-2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-----------------------------------------------------------------------------
3
9
4 from functools import wraps
10 from functools import wraps
5 from IPython.external.decorator import decorator
11 from IPython.external.decorator import decorator
6
12
7 def _clear():
13 def _clear():
8 globals().clear()
14 globals().clear()
9
15
10 @decorator
16 @decorator
11 def spinfirst(f):
17 def spinfirst(f):
12 @wraps(f)
18 @wraps(f)
13 def spun_method(self, *args, **kwargs):
19 def spun_method(self, *args, **kwargs):
14 self.spin()
20 self.spin()
15 return f(self, *args, **kwargs)
21 return f(self, *args, **kwargs)
16 return spun_method
22 return spun_method
17
23
18 @decorator
24 @decorator
19 def myblock(f, self, *args, **kwargs):
25 def myblock(f, self, *args, **kwargs):
20 block = self.client.block
26 block = self.client.block
21 self.client.block = self.block
27 self.client.block = self.block
22 ret = f(self, *args, **kwargs)
28 ret = f(self, *args, **kwargs)
23 self.client.block = block
29 self.client.block = block
24 return ret
30 return ret
25
31
26 class RemoteNamespace(object):
32 class RemoteNamespace(object):
27 """A RemoteNamespace object, providing dictionary
33 """A RemoteNamespace object, providing dictionary
28 access to an engine via an IPython.zmq.client object.
34 access to an engine via an IPython.zmq.client object.
29
35
30
36
31 """
37 """
32 client = None
38 client = None
33 queue = None
39 queue = None
34 id = None
40 id = None
35 block = False
41 block = False
36
42
37 def __init__(self, client, id):
43 def __init__(self, client, id):
38 self.client = client
44 self.client = client
39 self.id = id
45 self.id = id
40 self.block = client.block # initial state is same as client
46 self.block = client.block # initial state is same as client
41
47
42 def __repr__(self):
48 def __repr__(self):
43 return "<RemoteNamespace[%i]>"%self.id
49 return "<RemoteNamespace[%i]>"%self.id
44
50
45 @myblock
51 @myblock
46 def apply(self, f, *args, **kwargs):
52 def apply(self, f, *args, **kwargs):
47 """call f(*args, **kwargs) in remote namespace
53 """call f(*args, **kwargs) in remote namespace
48
54
49 This method has no access to the user namespace"""
55 This method has no access to the user namespace"""
50 return self.client.apply_to(self.id, f, *args, **kwargs)
56 return self.client.apply_to(self.id, f, *args, **kwargs)
51
57
52 @myblock
58 @myblock
53 def apply_bound(self, f, *args, **kwargs):
59 def apply_bound(self, f, *args, **kwargs):
54 """call `f(*args, **kwargs)` in remote namespace.
60 """call `f(*args, **kwargs)` in remote namespace.
55
61
56 `f` will have access to the user namespace as globals()."""
62 `f` will have access to the user namespace as globals()."""
57 return self.client.apply_bound_to(self.id, f, *args, **kwargs)
63 return self.client.apply_bound_to(self.id, f, *args, **kwargs)
58
64
59 @myblock
65 @myblock
60 def update(self, ns):
66 def update(self, ns):
61 """update remote namespace with dict `ns`"""
67 """update remote namespace with dict `ns`"""
62 return self.client.push(self.id, ns, self.block)
68 return self.client.push(self.id, ns, self.block)
63
69
64 def get(self, key_s):
70 def get(self, key_s):
65 """get object(s) by `key_s` from remote namespace
71 """get object(s) by `key_s` from remote namespace
66 will return one object if it is a key.
72 will return one object if it is a key.
67 It also takes a list of keys, and will return a list of objects."""
73 It also takes a list of keys, and will return a list of objects."""
68 return self.client.pull(self.id, key_s, self.block)
74 return self.client.pull(self.id, key_s, self.block)
69
75
70 push = update
76 push = update
71 pull = get
77 pull = get
72
78
73 def __getitem__(self, key):
79 def __getitem__(self, key):
74 return self.get(key)
80 return self.get(key)
75
81
76 def __setitem__(self,key,value):
82 def __setitem__(self,key,value):
77 self.update({key:value})
83 self.update({key:value})
78
84
79 def clear(self):
85 def clear(self):
80 """clear the remote namespace"""
86 """clear the remote namespace"""
81 return self.client.apply_bound_to(self.id, _clear)
87 return self.client.apply_bound_to(self.id, _clear)
82
88
83 @decorator
89 @decorator
84 def withme(self, toapply):
90 def withme(self, toapply):
85 """for use as a decorator, this turns a function into
91 """for use as a decorator, this turns a function into
86 one that executes remotely."""
92 one that executes remotely."""
87 @wraps(toapply)
93 @wraps(toapply)
88 def applied(self, *args, **kwargs):
94 def applied(self, *args, **kwargs):
89 return self.apply_bound(self, toapply, *args, **kwargs)
95 return self.apply_bound(self, toapply, *args, **kwargs)
90 return applied
96 return applied
91
97
92
98
93
99
94
100
95
101
@@ -1,584 +1,590 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
7
13
8 #----------------------------------------------------------------------
14 #----------------------------------------------------------------------
9 # Imports
15 # Imports
10 #----------------------------------------------------------------------
16 #----------------------------------------------------------------------
11
17
12 from __future__ import print_function
18 from __future__ import print_function
13
19
14 import logging
20 import logging
15 import sys
21 import sys
16
22
17 from datetime import datetime, timedelta
23 from datetime import datetime, timedelta
18 from random import randint, random
24 from random import randint, random
19 from types import FunctionType
25 from types import FunctionType
20
26
21 try:
27 try:
22 import numpy
28 import numpy
23 except ImportError:
29 except ImportError:
24 numpy = None
30 numpy = None
25
31
26 import zmq
32 import zmq
27 from zmq.eventloop import ioloop, zmqstream
33 from zmq.eventloop import ioloop, zmqstream
28
34
29 # local imports
35 # local imports
30 from IPython.external.decorator import decorator
36 from IPython.external.decorator import decorator
31 from IPython.utils.traitlets import Instance, Dict, List, Set
37 from IPython.utils.traitlets import Instance, Dict, List, Set
32
38
33 from . import error
39 from . import error
34 from .dependency import Dependency
40 from .dependency import Dependency
35 from .entry_point import connect_logger, local_logger
41 from .entry_point import connect_logger, local_logger
36 from .factory import SessionFactory
42 from .factory import SessionFactory
37
43
38
44
39 @decorator
45 @decorator
40 def logged(f,self,*args,**kwargs):
46 def logged(f,self,*args,**kwargs):
41 # print ("#--------------------")
47 # print ("#--------------------")
42 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
48 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
43 # print ("#--")
49 # print ("#--")
44 return f(self,*args, **kwargs)
50 return f(self,*args, **kwargs)
45
51
46 #----------------------------------------------------------------------
52 #----------------------------------------------------------------------
47 # Chooser functions
53 # Chooser functions
48 #----------------------------------------------------------------------
54 #----------------------------------------------------------------------
49
55
50 def plainrandom(loads):
56 def plainrandom(loads):
51 """Plain random pick."""
57 """Plain random pick."""
52 n = len(loads)
58 n = len(loads)
53 return randint(0,n-1)
59 return randint(0,n-1)
54
60
55 def lru(loads):
61 def lru(loads):
56 """Always pick the front of the line.
62 """Always pick the front of the line.
57
63
58 The content of `loads` is ignored.
64 The content of `loads` is ignored.
59
65
60 Assumes LRU ordering of loads, with oldest first.
66 Assumes LRU ordering of loads, with oldest first.
61 """
67 """
62 return 0
68 return 0
63
69
64 def twobin(loads):
70 def twobin(loads):
65 """Pick two at random, use the LRU of the two.
71 """Pick two at random, use the LRU of the two.
66
72
67 The content of loads is ignored.
73 The content of loads is ignored.
68
74
69 Assumes LRU ordering of loads, with oldest first.
75 Assumes LRU ordering of loads, with oldest first.
70 """
76 """
71 n = len(loads)
77 n = len(loads)
72 a = randint(0,n-1)
78 a = randint(0,n-1)
73 b = randint(0,n-1)
79 b = randint(0,n-1)
74 return min(a,b)
80 return min(a,b)
75
81
76 def weighted(loads):
82 def weighted(loads):
77 """Pick two at random using inverse load as weight.
83 """Pick two at random using inverse load as weight.
78
84
79 Return the less loaded of the two.
85 Return the less loaded of the two.
80 """
86 """
81 # weight 0 a million times more than 1:
87 # weight 0 a million times more than 1:
82 weights = 1./(1e-6+numpy.array(loads))
88 weights = 1./(1e-6+numpy.array(loads))
83 sums = weights.cumsum()
89 sums = weights.cumsum()
84 t = sums[-1]
90 t = sums[-1]
85 x = random()*t
91 x = random()*t
86 y = random()*t
92 y = random()*t
87 idx = 0
93 idx = 0
88 idy = 0
94 idy = 0
89 while sums[idx] < x:
95 while sums[idx] < x:
90 idx += 1
96 idx += 1
91 while sums[idy] < y:
97 while sums[idy] < y:
92 idy += 1
98 idy += 1
93 if weights[idy] > weights[idx]:
99 if weights[idy] > weights[idx]:
94 return idy
100 return idy
95 else:
101 else:
96 return idx
102 return idx
97
103
98 def leastload(loads):
104 def leastload(loads):
99 """Always choose the lowest load.
105 """Always choose the lowest load.
100
106
101 If the lowest load occurs more than once, the first
107 If the lowest load occurs more than once, the first
102 occurance will be used. If loads has LRU ordering, this means
108 occurance will be used. If loads has LRU ordering, this means
103 the LRU of those with the lowest load is chosen.
109 the LRU of those with the lowest load is chosen.
104 """
110 """
105 return loads.index(min(loads))
111 return loads.index(min(loads))
106
112
107 #---------------------------------------------------------------------
113 #---------------------------------------------------------------------
108 # Classes
114 # Classes
109 #---------------------------------------------------------------------
115 #---------------------------------------------------------------------
110 # store empty default dependency:
116 # store empty default dependency:
111 MET = Dependency([])
117 MET = Dependency([])
112
118
113 class TaskScheduler(SessionFactory):
119 class TaskScheduler(SessionFactory):
114 """Python TaskScheduler object.
120 """Python TaskScheduler object.
115
121
116 This is the simplest object that supports msg_id based
122 This is the simplest object that supports msg_id based
117 DAG dependencies. *Only* task msg_ids are checked, not
123 DAG dependencies. *Only* task msg_ids are checked, not
118 msg_ids of jobs submitted via the MUX queue.
124 msg_ids of jobs submitted via the MUX queue.
119
125
120 """
126 """
121
127
122 # input arguments:
128 # input arguments:
123 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
129 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
124 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
130 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
125 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
131 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
126 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
132 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
133 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
128
134
129 # internals:
135 # internals:
130 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
136 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
131 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
137 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
132 pending = Dict() # dict by engine_uuid of submitted tasks
138 pending = Dict() # dict by engine_uuid of submitted tasks
133 completed = Dict() # dict by engine_uuid of completed tasks
139 completed = Dict() # dict by engine_uuid of completed tasks
134 failed = Dict() # dict by engine_uuid of failed tasks
140 failed = Dict() # dict by engine_uuid of failed tasks
135 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
141 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
136 clients = Dict() # dict by msg_id for who submitted the task
142 clients = Dict() # dict by msg_id for who submitted the task
137 targets = List() # list of target IDENTs
143 targets = List() # list of target IDENTs
138 loads = List() # list of engine loads
144 loads = List() # list of engine loads
139 all_completed = Set() # set of all completed tasks
145 all_completed = Set() # set of all completed tasks
140 all_failed = Set() # set of all failed tasks
146 all_failed = Set() # set of all failed tasks
141 all_done = Set() # set of all finished tasks=union(completed,failed)
147 all_done = Set() # set of all finished tasks=union(completed,failed)
142 all_ids = Set() # set of all submitted task IDs
148 all_ids = Set() # set of all submitted task IDs
143 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
149 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
144 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
150 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
145
151
146
152
147 def start(self):
153 def start(self):
148 self.engine_stream.on_recv(self.dispatch_result, copy=False)
154 self.engine_stream.on_recv(self.dispatch_result, copy=False)
149 self._notification_handlers = dict(
155 self._notification_handlers = dict(
150 registration_notification = self._register_engine,
156 registration_notification = self._register_engine,
151 unregistration_notification = self._unregister_engine
157 unregistration_notification = self._unregister_engine
152 )
158 )
153 self.notifier_stream.on_recv(self.dispatch_notification)
159 self.notifier_stream.on_recv(self.dispatch_notification)
154 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
160 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
155 self.auditor.start()
161 self.auditor.start()
156 self.log.info("Scheduler started...%r"%self)
162 self.log.info("Scheduler started...%r"%self)
157
163
158 def resume_receiving(self):
164 def resume_receiving(self):
159 """Resume accepting jobs."""
165 """Resume accepting jobs."""
160 self.client_stream.on_recv(self.dispatch_submission, copy=False)
166 self.client_stream.on_recv(self.dispatch_submission, copy=False)
161
167
162 def stop_receiving(self):
168 def stop_receiving(self):
163 """Stop accepting jobs while there are no engines.
169 """Stop accepting jobs while there are no engines.
164 Leave them in the ZMQ queue."""
170 Leave them in the ZMQ queue."""
165 self.client_stream.on_recv(None)
171 self.client_stream.on_recv(None)
166
172
167 #-----------------------------------------------------------------------
173 #-----------------------------------------------------------------------
168 # [Un]Registration Handling
174 # [Un]Registration Handling
169 #-----------------------------------------------------------------------
175 #-----------------------------------------------------------------------
170
176
171 def dispatch_notification(self, msg):
177 def dispatch_notification(self, msg):
172 """dispatch register/unregister events."""
178 """dispatch register/unregister events."""
173 idents,msg = self.session.feed_identities(msg)
179 idents,msg = self.session.feed_identities(msg)
174 msg = self.session.unpack_message(msg)
180 msg = self.session.unpack_message(msg)
175 msg_type = msg['msg_type']
181 msg_type = msg['msg_type']
176 handler = self._notification_handlers.get(msg_type, None)
182 handler = self._notification_handlers.get(msg_type, None)
177 if handler is None:
183 if handler is None:
178 raise Exception("Unhandled message type: %s"%msg_type)
184 raise Exception("Unhandled message type: %s"%msg_type)
179 else:
185 else:
180 try:
186 try:
181 handler(str(msg['content']['queue']))
187 handler(str(msg['content']['queue']))
182 except KeyError:
188 except KeyError:
183 self.log.error("task::Invalid notification msg: %s"%msg)
189 self.log.error("task::Invalid notification msg: %s"%msg)
184
190
185 @logged
191 @logged
186 def _register_engine(self, uid):
192 def _register_engine(self, uid):
187 """New engine with ident `uid` became available."""
193 """New engine with ident `uid` became available."""
188 # head of the line:
194 # head of the line:
189 self.targets.insert(0,uid)
195 self.targets.insert(0,uid)
190 self.loads.insert(0,0)
196 self.loads.insert(0,0)
191 # initialize sets
197 # initialize sets
192 self.completed[uid] = set()
198 self.completed[uid] = set()
193 self.failed[uid] = set()
199 self.failed[uid] = set()
194 self.pending[uid] = {}
200 self.pending[uid] = {}
195 if len(self.targets) == 1:
201 if len(self.targets) == 1:
196 self.resume_receiving()
202 self.resume_receiving()
197
203
198 def _unregister_engine(self, uid):
204 def _unregister_engine(self, uid):
199 """Existing engine with ident `uid` became unavailable."""
205 """Existing engine with ident `uid` became unavailable."""
200 if len(self.targets) == 1:
206 if len(self.targets) == 1:
201 # this was our only engine
207 # this was our only engine
202 self.stop_receiving()
208 self.stop_receiving()
203
209
204 # handle any potentially finished tasks:
210 # handle any potentially finished tasks:
205 self.engine_stream.flush()
211 self.engine_stream.flush()
206
212
207 self.completed.pop(uid)
213 self.completed.pop(uid)
208 self.failed.pop(uid)
214 self.failed.pop(uid)
209 # don't pop destinations, because it might be used later
215 # don't pop destinations, because it might be used later
210 # map(self.destinations.pop, self.completed.pop(uid))
216 # map(self.destinations.pop, self.completed.pop(uid))
211 # map(self.destinations.pop, self.failed.pop(uid))
217 # map(self.destinations.pop, self.failed.pop(uid))
212
218
213 idx = self.targets.index(uid)
219 idx = self.targets.index(uid)
214 self.targets.pop(idx)
220 self.targets.pop(idx)
215 self.loads.pop(idx)
221 self.loads.pop(idx)
216
222
217 # wait 5 seconds before cleaning up pending jobs, since the results might
223 # wait 5 seconds before cleaning up pending jobs, since the results might
218 # still be incoming
224 # still be incoming
219 if self.pending[uid]:
225 if self.pending[uid]:
220 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
226 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
221 dc.start()
227 dc.start()
222
228
223 @logged
229 @logged
224 def handle_stranded_tasks(self, engine):
230 def handle_stranded_tasks(self, engine):
225 """Deal with jobs resident in an engine that died."""
231 """Deal with jobs resident in an engine that died."""
226 lost = self.pending.pop(engine)
232 lost = self.pending.pop(engine)
227
233
228 for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems():
234 for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems():
229 self.all_failed.add(msg_id)
235 self.all_failed.add(msg_id)
230 self.all_done.add(msg_id)
236 self.all_done.add(msg_id)
231 idents,msg = self.session.feed_identities(raw_msg, copy=False)
237 idents,msg = self.session.feed_identities(raw_msg, copy=False)
232 msg = self.session.unpack_message(msg, copy=False, content=False)
238 msg = self.session.unpack_message(msg, copy=False, content=False)
233 parent = msg['header']
239 parent = msg['header']
234 idents = [idents[0],engine]+idents[1:]
240 idents = [idents[0],engine]+idents[1:]
235 print (idents)
241 print (idents)
236 try:
242 try:
237 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
243 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
238 except:
244 except:
239 content = error.wrap_exception()
245 content = error.wrap_exception()
240 msg = self.session.send(self.client_stream, 'apply_reply', content,
246 msg = self.session.send(self.client_stream, 'apply_reply', content,
241 parent=parent, ident=idents)
247 parent=parent, ident=idents)
242 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
248 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
243 self.update_graph(msg_id)
249 self.update_graph(msg_id)
244
250
245
251
246 #-----------------------------------------------------------------------
252 #-----------------------------------------------------------------------
247 # Job Submission
253 # Job Submission
248 #-----------------------------------------------------------------------
254 #-----------------------------------------------------------------------
249 @logged
255 @logged
250 def dispatch_submission(self, raw_msg):
256 def dispatch_submission(self, raw_msg):
251 """Dispatch job submission to appropriate handlers."""
257 """Dispatch job submission to appropriate handlers."""
252 # ensure targets up to date:
258 # ensure targets up to date:
253 self.notifier_stream.flush()
259 self.notifier_stream.flush()
254 try:
260 try:
255 idents, msg = self.session.feed_identities(raw_msg, copy=False)
261 idents, msg = self.session.feed_identities(raw_msg, copy=False)
256 msg = self.session.unpack_message(msg, content=False, copy=False)
262 msg = self.session.unpack_message(msg, content=False, copy=False)
257 except:
263 except:
258 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
264 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
259 return
265 return
260
266
261 # send to monitor
267 # send to monitor
262 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
268 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
263
269
264 header = msg['header']
270 header = msg['header']
265 msg_id = header['msg_id']
271 msg_id = header['msg_id']
266 self.all_ids.add(msg_id)
272 self.all_ids.add(msg_id)
267
273
268 # targets
274 # targets
269 targets = set(header.get('targets', []))
275 targets = set(header.get('targets', []))
270
276
271 # time dependencies
277 # time dependencies
272 after = Dependency(header.get('after', []))
278 after = Dependency(header.get('after', []))
273 if after.all:
279 if after.all:
274 after.difference_update(self.all_completed)
280 after.difference_update(self.all_completed)
275 if not after.success_only:
281 if not after.success_only:
276 after.difference_update(self.all_failed)
282 after.difference_update(self.all_failed)
277 if after.check(self.all_completed, self.all_failed):
283 if after.check(self.all_completed, self.all_failed):
278 # recast as empty set, if `after` already met,
284 # recast as empty set, if `after` already met,
279 # to prevent unnecessary set comparisons
285 # to prevent unnecessary set comparisons
280 after = MET
286 after = MET
281
287
282 # location dependencies
288 # location dependencies
283 follow = Dependency(header.get('follow', []))
289 follow = Dependency(header.get('follow', []))
284
290
285 # turn timeouts into datetime objects:
291 # turn timeouts into datetime objects:
286 timeout = header.get('timeout', None)
292 timeout = header.get('timeout', None)
287 if timeout:
293 if timeout:
288 timeout = datetime.now() + timedelta(0,timeout,0)
294 timeout = datetime.now() + timedelta(0,timeout,0)
289
295
290 args = [raw_msg, targets, after, follow, timeout]
296 args = [raw_msg, targets, after, follow, timeout]
291
297
292 # validate and reduce dependencies:
298 # validate and reduce dependencies:
293 for dep in after,follow:
299 for dep in after,follow:
294 # check valid:
300 # check valid:
295 if msg_id in dep or dep.difference(self.all_ids):
301 if msg_id in dep or dep.difference(self.all_ids):
296 self.depending[msg_id] = args
302 self.depending[msg_id] = args
297 return self.fail_unreachable(msg_id, error.InvalidDependency)
303 return self.fail_unreachable(msg_id, error.InvalidDependency)
298 # check if unreachable:
304 # check if unreachable:
299 if dep.unreachable(self.all_failed):
305 if dep.unreachable(self.all_failed):
300 self.depending[msg_id] = args
306 self.depending[msg_id] = args
301 return self.fail_unreachable(msg_id)
307 return self.fail_unreachable(msg_id)
302
308
303 if after.check(self.all_completed, self.all_failed):
309 if after.check(self.all_completed, self.all_failed):
304 # time deps already met, try to run
310 # time deps already met, try to run
305 if not self.maybe_run(msg_id, *args):
311 if not self.maybe_run(msg_id, *args):
306 # can't run yet
312 # can't run yet
307 self.save_unmet(msg_id, *args)
313 self.save_unmet(msg_id, *args)
308 else:
314 else:
309 self.save_unmet(msg_id, *args)
315 self.save_unmet(msg_id, *args)
310
316
311 # @logged
317 # @logged
312 def audit_timeouts(self):
318 def audit_timeouts(self):
313 """Audit all waiting tasks for expired timeouts."""
319 """Audit all waiting tasks for expired timeouts."""
314 now = datetime.now()
320 now = datetime.now()
315 for msg_id in self.depending.keys():
321 for msg_id in self.depending.keys():
316 # must recheck, in case one failure cascaded to another:
322 # must recheck, in case one failure cascaded to another:
317 if msg_id in self.depending:
323 if msg_id in self.depending:
318 raw,after,targets,follow,timeout = self.depending[msg_id]
324 raw,after,targets,follow,timeout = self.depending[msg_id]
319 if timeout and timeout < now:
325 if timeout and timeout < now:
320 self.fail_unreachable(msg_id, timeout=True)
326 self.fail_unreachable(msg_id, timeout=True)
321
327
322 @logged
328 @logged
323 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
329 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
324 """a task has become unreachable, send a reply with an ImpossibleDependency
330 """a task has become unreachable, send a reply with an ImpossibleDependency
325 error."""
331 error."""
326 if msg_id not in self.depending:
332 if msg_id not in self.depending:
327 self.log.error("msg %r already failed!"%msg_id)
333 self.log.error("msg %r already failed!"%msg_id)
328 return
334 return
329 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
335 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
330 for mid in follow.union(after):
336 for mid in follow.union(after):
331 if mid in self.graph:
337 if mid in self.graph:
332 self.graph[mid].remove(msg_id)
338 self.graph[mid].remove(msg_id)
333
339
334 # FIXME: unpacking a message I've already unpacked, but didn't save:
340 # FIXME: unpacking a message I've already unpacked, but didn't save:
335 idents,msg = self.session.feed_identities(raw_msg, copy=False)
341 idents,msg = self.session.feed_identities(raw_msg, copy=False)
336 msg = self.session.unpack_message(msg, copy=False, content=False)
342 msg = self.session.unpack_message(msg, copy=False, content=False)
337 header = msg['header']
343 header = msg['header']
338
344
339 try:
345 try:
340 raise why()
346 raise why()
341 except:
347 except:
342 content = error.wrap_exception()
348 content = error.wrap_exception()
343
349
344 self.all_done.add(msg_id)
350 self.all_done.add(msg_id)
345 self.all_failed.add(msg_id)
351 self.all_failed.add(msg_id)
346
352
347 msg = self.session.send(self.client_stream, 'apply_reply', content,
353 msg = self.session.send(self.client_stream, 'apply_reply', content,
348 parent=header, ident=idents)
354 parent=header, ident=idents)
349 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
355 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
350
356
351 self.update_graph(msg_id, success=False)
357 self.update_graph(msg_id, success=False)
352
358
353 @logged
359 @logged
354 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
360 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
355 """check location dependencies, and run if they are met."""
361 """check location dependencies, and run if they are met."""
356 blacklist = self.blacklist.setdefault(msg_id, set())
362 blacklist = self.blacklist.setdefault(msg_id, set())
357 if follow or targets or blacklist:
363 if follow or targets or blacklist:
358 # we need a can_run filter
364 # we need a can_run filter
359 def can_run(idx):
365 def can_run(idx):
360 target = self.targets[idx]
366 target = self.targets[idx]
361 # check targets
367 # check targets
362 if targets and target not in targets:
368 if targets and target not in targets:
363 return False
369 return False
364 # check blacklist
370 # check blacklist
365 if target in blacklist:
371 if target in blacklist:
366 return False
372 return False
367 # check follow
373 # check follow
368 return follow.check(self.completed[target], self.failed[target])
374 return follow.check(self.completed[target], self.failed[target])
369
375
370 indices = filter(can_run, range(len(self.targets)))
376 indices = filter(can_run, range(len(self.targets)))
371 if not indices:
377 if not indices:
372 # couldn't run
378 # couldn't run
373 if follow.all:
379 if follow.all:
374 # check follow for impossibility
380 # check follow for impossibility
375 dests = set()
381 dests = set()
376 relevant = self.all_completed if follow.success_only else self.all_done
382 relevant = self.all_completed if follow.success_only else self.all_done
377 for m in follow.intersection(relevant):
383 for m in follow.intersection(relevant):
378 dests.add(self.destinations[m])
384 dests.add(self.destinations[m])
379 if len(dests) > 1:
385 if len(dests) > 1:
380 self.fail_unreachable(msg_id)
386 self.fail_unreachable(msg_id)
381 return False
387 return False
382 if targets:
388 if targets:
383 # check blacklist+targets for impossibility
389 # check blacklist+targets for impossibility
384 targets.difference_update(blacklist)
390 targets.difference_update(blacklist)
385 if not targets or not targets.intersection(self.targets):
391 if not targets or not targets.intersection(self.targets):
386 self.fail_unreachable(msg_id)
392 self.fail_unreachable(msg_id)
387 return False
393 return False
388 return False
394 return False
389 else:
395 else:
390 indices = None
396 indices = None
391
397
392 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
398 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
393 return True
399 return True
394
400
395 @logged
401 @logged
396 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
402 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
397 """Save a message for later submission when its dependencies are met."""
403 """Save a message for later submission when its dependencies are met."""
398 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
404 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
399 # track the ids in follow or after, but not those already finished
405 # track the ids in follow or after, but not those already finished
400 for dep_id in after.union(follow).difference(self.all_done):
406 for dep_id in after.union(follow).difference(self.all_done):
401 if dep_id not in self.graph:
407 if dep_id not in self.graph:
402 self.graph[dep_id] = set()
408 self.graph[dep_id] = set()
403 self.graph[dep_id].add(msg_id)
409 self.graph[dep_id].add(msg_id)
404
410
405 @logged
411 @logged
406 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
412 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
407 """Submit a task to any of a subset of our targets."""
413 """Submit a task to any of a subset of our targets."""
408 if indices:
414 if indices:
409 loads = [self.loads[i] for i in indices]
415 loads = [self.loads[i] for i in indices]
410 else:
416 else:
411 loads = self.loads
417 loads = self.loads
412 idx = self.scheme(loads)
418 idx = self.scheme(loads)
413 if indices:
419 if indices:
414 idx = indices[idx]
420 idx = indices[idx]
415 target = self.targets[idx]
421 target = self.targets[idx]
416 # print (target, map(str, msg[:3]))
422 # print (target, map(str, msg[:3]))
417 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
423 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
418 self.engine_stream.send_multipart(raw_msg, copy=False)
424 self.engine_stream.send_multipart(raw_msg, copy=False)
419 self.add_job(idx)
425 self.add_job(idx)
420 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
426 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
421 content = dict(msg_id=msg_id, engine_id=target)
427 content = dict(msg_id=msg_id, engine_id=target)
422 self.session.send(self.mon_stream, 'task_destination', content=content,
428 self.session.send(self.mon_stream, 'task_destination', content=content,
423 ident=['tracktask',self.session.session])
429 ident=['tracktask',self.session.session])
424
430
425 #-----------------------------------------------------------------------
431 #-----------------------------------------------------------------------
426 # Result Handling
432 # Result Handling
427 #-----------------------------------------------------------------------
433 #-----------------------------------------------------------------------
428 @logged
434 @logged
429 def dispatch_result(self, raw_msg):
435 def dispatch_result(self, raw_msg):
430 """dispatch method for result replies"""
436 """dispatch method for result replies"""
431 try:
437 try:
432 idents,msg = self.session.feed_identities(raw_msg, copy=False)
438 idents,msg = self.session.feed_identities(raw_msg, copy=False)
433 msg = self.session.unpack_message(msg, content=False, copy=False)
439 msg = self.session.unpack_message(msg, content=False, copy=False)
434 except:
440 except:
435 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
441 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
436 return
442 return
437
443
438 header = msg['header']
444 header = msg['header']
439 if header.get('dependencies_met', True):
445 if header.get('dependencies_met', True):
440 success = (header['status'] == 'ok')
446 success = (header['status'] == 'ok')
441 self.handle_result(idents, msg['parent_header'], raw_msg, success)
447 self.handle_result(idents, msg['parent_header'], raw_msg, success)
442 # send to Hub monitor
448 # send to Hub monitor
443 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
449 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
444 else:
450 else:
445 self.handle_unmet_dependency(idents, msg['parent_header'])
451 self.handle_unmet_dependency(idents, msg['parent_header'])
446
452
447 @logged
453 @logged
448 def handle_result(self, idents, parent, raw_msg, success=True):
454 def handle_result(self, idents, parent, raw_msg, success=True):
449 """handle a real task result, either success or failure"""
455 """handle a real task result, either success or failure"""
450 # first, relay result to client
456 # first, relay result to client
451 engine = idents[0]
457 engine = idents[0]
452 client = idents[1]
458 client = idents[1]
453 # swap_ids for XREP-XREP mirror
459 # swap_ids for XREP-XREP mirror
454 raw_msg[:2] = [client,engine]
460 raw_msg[:2] = [client,engine]
455 # print (map(str, raw_msg[:4]))
461 # print (map(str, raw_msg[:4]))
456 self.client_stream.send_multipart(raw_msg, copy=False)
462 self.client_stream.send_multipart(raw_msg, copy=False)
457 # now, update our data structures
463 # now, update our data structures
458 msg_id = parent['msg_id']
464 msg_id = parent['msg_id']
459 self.blacklist.pop(msg_id, None)
465 self.blacklist.pop(msg_id, None)
460 self.pending[engine].pop(msg_id)
466 self.pending[engine].pop(msg_id)
461 if success:
467 if success:
462 self.completed[engine].add(msg_id)
468 self.completed[engine].add(msg_id)
463 self.all_completed.add(msg_id)
469 self.all_completed.add(msg_id)
464 else:
470 else:
465 self.failed[engine].add(msg_id)
471 self.failed[engine].add(msg_id)
466 self.all_failed.add(msg_id)
472 self.all_failed.add(msg_id)
467 self.all_done.add(msg_id)
473 self.all_done.add(msg_id)
468 self.destinations[msg_id] = engine
474 self.destinations[msg_id] = engine
469
475
470 self.update_graph(msg_id, success)
476 self.update_graph(msg_id, success)
471
477
472 @logged
478 @logged
473 def handle_unmet_dependency(self, idents, parent):
479 def handle_unmet_dependency(self, idents, parent):
474 """handle an unmet dependency"""
480 """handle an unmet dependency"""
475 engine = idents[0]
481 engine = idents[0]
476 msg_id = parent['msg_id']
482 msg_id = parent['msg_id']
477
483
478 if msg_id not in self.blacklist:
484 if msg_id not in self.blacklist:
479 self.blacklist[msg_id] = set()
485 self.blacklist[msg_id] = set()
480 self.blacklist[msg_id].add(engine)
486 self.blacklist[msg_id].add(engine)
481
487
482 args = self.pending[engine].pop(msg_id)
488 args = self.pending[engine].pop(msg_id)
483 raw,targets,after,follow,timeout = args
489 raw,targets,after,follow,timeout = args
484
490
485 if self.blacklist[msg_id] == targets:
491 if self.blacklist[msg_id] == targets:
486 self.depending[msg_id] = args
492 self.depending[msg_id] = args
487 return self.fail_unreachable(msg_id)
493 return self.fail_unreachable(msg_id)
488
494
489 elif not self.maybe_run(msg_id, *args):
495 elif not self.maybe_run(msg_id, *args):
490 # resubmit failed, put it back in our dependency tree
496 # resubmit failed, put it back in our dependency tree
491 self.save_unmet(msg_id, *args)
497 self.save_unmet(msg_id, *args)
492
498
493
499
494 @logged
500 @logged
495 def update_graph(self, dep_id, success=True):
501 def update_graph(self, dep_id, success=True):
496 """dep_id just finished. Update our dependency
502 """dep_id just finished. Update our dependency
497 graph and submit any jobs that just became runable."""
503 graph and submit any jobs that just became runable."""
498 # print ("\n\n***********")
504 # print ("\n\n***********")
499 # pprint (dep_id)
505 # pprint (dep_id)
500 # pprint (self.graph)
506 # pprint (self.graph)
501 # pprint (self.depending)
507 # pprint (self.depending)
502 # pprint (self.all_completed)
508 # pprint (self.all_completed)
503 # pprint (self.all_failed)
509 # pprint (self.all_failed)
504 # print ("\n\n***********\n\n")
510 # print ("\n\n***********\n\n")
505 if dep_id not in self.graph:
511 if dep_id not in self.graph:
506 return
512 return
507 jobs = self.graph.pop(dep_id)
513 jobs = self.graph.pop(dep_id)
508
514
509 for msg_id in jobs:
515 for msg_id in jobs:
510 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
516 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
511 # if dep_id in after:
517 # if dep_id in after:
512 # if after.all and (success or not after.success_only):
518 # if after.all and (success or not after.success_only):
513 # after.remove(dep_id)
519 # after.remove(dep_id)
514
520
515 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
521 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
516 self.fail_unreachable(msg_id)
522 self.fail_unreachable(msg_id)
517
523
518 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
524 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
519 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
525 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
520
526
521 self.depending.pop(msg_id)
527 self.depending.pop(msg_id)
522 for mid in follow.union(after):
528 for mid in follow.union(after):
523 if mid in self.graph:
529 if mid in self.graph:
524 self.graph[mid].remove(msg_id)
530 self.graph[mid].remove(msg_id)
525
531
526 #----------------------------------------------------------------------
532 #----------------------------------------------------------------------
527 # methods to be overridden by subclasses
533 # methods to be overridden by subclasses
528 #----------------------------------------------------------------------
534 #----------------------------------------------------------------------
529
535
530 def add_job(self, idx):
536 def add_job(self, idx):
531 """Called after self.targets[idx] just got the job with header.
537 """Called after self.targets[idx] just got the job with header.
532 Override with subclasses. The default ordering is simple LRU.
538 Override with subclasses. The default ordering is simple LRU.
533 The default loads are the number of outstanding jobs."""
539 The default loads are the number of outstanding jobs."""
534 self.loads[idx] += 1
540 self.loads[idx] += 1
535 for lis in (self.targets, self.loads):
541 for lis in (self.targets, self.loads):
536 lis.append(lis.pop(idx))
542 lis.append(lis.pop(idx))
537
543
538
544
539 def finish_job(self, idx):
545 def finish_job(self, idx):
540 """Called after self.targets[idx] just finished a job.
546 """Called after self.targets[idx] just finished a job.
541 Override with subclasses."""
547 Override with subclasses."""
542 self.loads[idx] -= 1
548 self.loads[idx] -= 1
543
549
544
550
545
551
546 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
552 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
547 log_addr=None, loglevel=logging.DEBUG, scheme='lru',
553 log_addr=None, loglevel=logging.DEBUG, scheme='lru',
548 identity=b'task'):
554 identity=b'task'):
549 from zmq.eventloop import ioloop
555 from zmq.eventloop import ioloop
550 from zmq.eventloop.zmqstream import ZMQStream
556 from zmq.eventloop.zmqstream import ZMQStream
551
557
552 ctx = zmq.Context()
558 ctx = zmq.Context()
553 loop = ioloop.IOLoop()
559 loop = ioloop.IOLoop()
554 print (in_addr, out_addr, mon_addr, not_addr)
560 print (in_addr, out_addr, mon_addr, not_addr)
555 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
561 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
556 ins.setsockopt(zmq.IDENTITY, identity)
562 ins.setsockopt(zmq.IDENTITY, identity)
557 ins.bind(in_addr)
563 ins.bind(in_addr)
558
564
559 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
565 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
560 outs.setsockopt(zmq.IDENTITY, identity)
566 outs.setsockopt(zmq.IDENTITY, identity)
561 outs.bind(out_addr)
567 outs.bind(out_addr)
562 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
568 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
563 mons.connect(mon_addr)
569 mons.connect(mon_addr)
564 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
570 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
565 nots.setsockopt(zmq.SUBSCRIBE, '')
571 nots.setsockopt(zmq.SUBSCRIBE, '')
566 nots.connect(not_addr)
572 nots.connect(not_addr)
567
573
568 scheme = globals().get(scheme, None)
574 scheme = globals().get(scheme, None)
569 # setup logging
575 # setup logging
570 if log_addr:
576 if log_addr:
571 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
577 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
572 else:
578 else:
573 local_logger(logname, loglevel)
579 local_logger(logname, loglevel)
574
580
575 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
581 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
576 mon_stream=mons, notifier_stream=nots,
582 mon_stream=mons, notifier_stream=nots,
577 scheme=scheme, loop=loop, logname=logname,
583 scheme=scheme, loop=loop, logname=logname,
578 config=config)
584 config=config)
579 scheduler.start()
585 scheduler.start()
580 try:
586 try:
581 loop.start()
587 loop.start()
582 except KeyboardInterrupt:
588 except KeyboardInterrupt:
583 print ("interrupted, exiting...", file=sys.__stderr__)
589 print ("interrupted, exiting...", file=sys.__stderr__)
584
590
@@ -1,487 +1,493 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2011 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
5
11
6 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
7 # Imports
13 # Imports
8 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
9
15
10 # Standard library imports.
16 # Standard library imports.
11 from __future__ import print_function
17 from __future__ import print_function
12
18
13 import sys
19 import sys
14 import time
20 import time
15
21
16 from code import CommandCompiler
22 from code import CommandCompiler
17 from datetime import datetime
23 from datetime import datetime
18 from pprint import pprint
24 from pprint import pprint
19 from signal import SIGTERM, SIGKILL
25 from signal import SIGTERM, SIGKILL
20
26
21 # System library imports.
27 # System library imports.
22 import zmq
28 import zmq
23 from zmq.eventloop import ioloop, zmqstream
29 from zmq.eventloop import ioloop, zmqstream
24
30
25 # Local imports.
31 # Local imports.
26 from IPython.core import ultratb
32 from IPython.core import ultratb
27 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
33 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
28 from IPython.zmq.completer import KernelCompleter
34 from IPython.zmq.completer import KernelCompleter
29 from IPython.zmq.iostream import OutStream
35 from IPython.zmq.iostream import OutStream
30 from IPython.zmq.displayhook import DisplayHook
36 from IPython.zmq.displayhook import DisplayHook
31
37
32 from . import heartmonitor
38 from . import heartmonitor
33 from .client import Client
39 from .client import Client
34 from .error import wrap_exception
40 from .error import wrap_exception
35 from .factory import SessionFactory
41 from .factory import SessionFactory
36 from .streamsession import StreamSession
42 from .streamsession import StreamSession
37 from .util import serialize_object, unpack_apply_message, ISO8601, Namespace
43 from .util import serialize_object, unpack_apply_message, ISO8601, Namespace
38
44
39 def printer(*args):
45 def printer(*args):
40 pprint(args, stream=sys.__stdout__)
46 pprint(args, stream=sys.__stdout__)
41
47
42
48
43 class _Passer:
49 class _Passer:
44 """Empty class that implements `send()` that does nothing."""
50 """Empty class that implements `send()` that does nothing."""
45 def send(self, *args, **kwargs):
51 def send(self, *args, **kwargs):
46 pass
52 pass
47 send_multipart = send
53 send_multipart = send
48
54
49
55
50 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
51 # Main kernel class
57 # Main kernel class
52 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
53
59
54 class Kernel(SessionFactory):
60 class Kernel(SessionFactory):
55
61
56 #---------------------------------------------------------------------------
62 #---------------------------------------------------------------------------
57 # Kernel interface
63 # Kernel interface
58 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
59
65
60 # kwargs:
66 # kwargs:
61 int_id = Int(-1, config=True)
67 int_id = Int(-1, config=True)
62 user_ns = Dict(config=True)
68 user_ns = Dict(config=True)
63 exec_lines = List(config=True)
69 exec_lines = List(config=True)
64
70
65 control_stream = Instance(zmqstream.ZMQStream)
71 control_stream = Instance(zmqstream.ZMQStream)
66 task_stream = Instance(zmqstream.ZMQStream)
72 task_stream = Instance(zmqstream.ZMQStream)
67 iopub_stream = Instance(zmqstream.ZMQStream)
73 iopub_stream = Instance(zmqstream.ZMQStream)
68 client = Instance('IPython.zmq.parallel.client.Client')
74 client = Instance('IPython.zmq.parallel.client.Client')
69
75
70 # internals
76 # internals
71 shell_streams = List()
77 shell_streams = List()
72 compiler = Instance(CommandCompiler, (), {})
78 compiler = Instance(CommandCompiler, (), {})
73 completer = Instance(KernelCompleter)
79 completer = Instance(KernelCompleter)
74
80
75 aborted = Set()
81 aborted = Set()
76 shell_handlers = Dict()
82 shell_handlers = Dict()
77 control_handlers = Dict()
83 control_handlers = Dict()
78
84
79 def _set_prefix(self):
85 def _set_prefix(self):
80 self.prefix = "engine.%s"%self.int_id
86 self.prefix = "engine.%s"%self.int_id
81
87
82 def _connect_completer(self):
88 def _connect_completer(self):
83 self.completer = KernelCompleter(self.user_ns)
89 self.completer = KernelCompleter(self.user_ns)
84
90
85 def __init__(self, **kwargs):
91 def __init__(self, **kwargs):
86 super(Kernel, self).__init__(**kwargs)
92 super(Kernel, self).__init__(**kwargs)
87 self._set_prefix()
93 self._set_prefix()
88 self._connect_completer()
94 self._connect_completer()
89
95
90 self.on_trait_change(self._set_prefix, 'id')
96 self.on_trait_change(self._set_prefix, 'id')
91 self.on_trait_change(self._connect_completer, 'user_ns')
97 self.on_trait_change(self._connect_completer, 'user_ns')
92
98
93 # Build dict of handlers for message types
99 # Build dict of handlers for message types
94 for msg_type in ['execute_request', 'complete_request', 'apply_request',
100 for msg_type in ['execute_request', 'complete_request', 'apply_request',
95 'clear_request']:
101 'clear_request']:
96 self.shell_handlers[msg_type] = getattr(self, msg_type)
102 self.shell_handlers[msg_type] = getattr(self, msg_type)
97
103
98 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
104 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
99 self.control_handlers[msg_type] = getattr(self, msg_type)
105 self.control_handlers[msg_type] = getattr(self, msg_type)
100
106
101 self._initial_exec_lines()
107 self._initial_exec_lines()
102
108
103 def _wrap_exception(self, method=None):
109 def _wrap_exception(self, method=None):
104 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
110 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
105 content=wrap_exception(e_info)
111 content=wrap_exception(e_info)
106 return content
112 return content
107
113
108 def _initial_exec_lines(self):
114 def _initial_exec_lines(self):
109 s = _Passer()
115 s = _Passer()
110 content = dict(silent=True, user_variable=[],user_expressions=[])
116 content = dict(silent=True, user_variable=[],user_expressions=[])
111 for line in self.exec_lines:
117 for line in self.exec_lines:
112 self.log.debug("executing initialization: %s"%line)
118 self.log.debug("executing initialization: %s"%line)
113 content.update({'code':line})
119 content.update({'code':line})
114 msg = self.session.msg('execute_request', content)
120 msg = self.session.msg('execute_request', content)
115 self.execute_request(s, [], msg)
121 self.execute_request(s, [], msg)
116
122
117
123
118 #-------------------- control handlers -----------------------------
124 #-------------------- control handlers -----------------------------
119 def abort_queues(self):
125 def abort_queues(self):
120 for stream in self.shell_streams:
126 for stream in self.shell_streams:
121 if stream:
127 if stream:
122 self.abort_queue(stream)
128 self.abort_queue(stream)
123
129
124 def abort_queue(self, stream):
130 def abort_queue(self, stream):
125 while True:
131 while True:
126 try:
132 try:
127 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
133 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
128 except zmq.ZMQError as e:
134 except zmq.ZMQError as e:
129 if e.errno == zmq.EAGAIN:
135 if e.errno == zmq.EAGAIN:
130 break
136 break
131 else:
137 else:
132 return
138 return
133 else:
139 else:
134 if msg is None:
140 if msg is None:
135 return
141 return
136 else:
142 else:
137 idents,msg = msg
143 idents,msg = msg
138
144
139 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
145 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
140 # msg = self.reply_socket.recv_json()
146 # msg = self.reply_socket.recv_json()
141 self.log.info("Aborting:")
147 self.log.info("Aborting:")
142 self.log.info(str(msg))
148 self.log.info(str(msg))
143 msg_type = msg['msg_type']
149 msg_type = msg['msg_type']
144 reply_type = msg_type.split('_')[0] + '_reply'
150 reply_type = msg_type.split('_')[0] + '_reply'
145 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
151 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
146 # self.reply_socket.send(ident,zmq.SNDMORE)
152 # self.reply_socket.send(ident,zmq.SNDMORE)
147 # self.reply_socket.send_json(reply_msg)
153 # self.reply_socket.send_json(reply_msg)
148 reply_msg = self.session.send(stream, reply_type,
154 reply_msg = self.session.send(stream, reply_type,
149 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
155 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
150 self.log.debug(str(reply_msg))
156 self.log.debug(str(reply_msg))
151 # We need to wait a bit for requests to come in. This can probably
157 # We need to wait a bit for requests to come in. This can probably
152 # be set shorter for true asynchronous clients.
158 # be set shorter for true asynchronous clients.
153 time.sleep(0.05)
159 time.sleep(0.05)
154
160
155 def abort_request(self, stream, ident, parent):
161 def abort_request(self, stream, ident, parent):
156 """abort a specifig msg by id"""
162 """abort a specifig msg by id"""
157 msg_ids = parent['content'].get('msg_ids', None)
163 msg_ids = parent['content'].get('msg_ids', None)
158 if isinstance(msg_ids, basestring):
164 if isinstance(msg_ids, basestring):
159 msg_ids = [msg_ids]
165 msg_ids = [msg_ids]
160 if not msg_ids:
166 if not msg_ids:
161 self.abort_queues()
167 self.abort_queues()
162 for mid in msg_ids:
168 for mid in msg_ids:
163 self.aborted.add(str(mid))
169 self.aborted.add(str(mid))
164
170
165 content = dict(status='ok')
171 content = dict(status='ok')
166 reply_msg = self.session.send(stream, 'abort_reply', content=content,
172 reply_msg = self.session.send(stream, 'abort_reply', content=content,
167 parent=parent, ident=ident)[0]
173 parent=parent, ident=ident)[0]
168 self.log.debug(str(reply_msg))
174 self.log.debug(str(reply_msg))
169
175
170 def shutdown_request(self, stream, ident, parent):
176 def shutdown_request(self, stream, ident, parent):
171 """kill ourself. This should really be handled in an external process"""
177 """kill ourself. This should really be handled in an external process"""
172 try:
178 try:
173 self.abort_queues()
179 self.abort_queues()
174 except:
180 except:
175 content = self._wrap_exception('shutdown')
181 content = self._wrap_exception('shutdown')
176 else:
182 else:
177 content = dict(parent['content'])
183 content = dict(parent['content'])
178 content['status'] = 'ok'
184 content['status'] = 'ok'
179 msg = self.session.send(stream, 'shutdown_reply',
185 msg = self.session.send(stream, 'shutdown_reply',
180 content=content, parent=parent, ident=ident)
186 content=content, parent=parent, ident=ident)
181 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
187 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
182 # content, parent, ident)
188 # content, parent, ident)
183 # print >> sys.__stdout__, msg
189 # print >> sys.__stdout__, msg
184 # time.sleep(0.2)
190 # time.sleep(0.2)
185 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
191 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
186 dc.start()
192 dc.start()
187
193
188 def dispatch_control(self, msg):
194 def dispatch_control(self, msg):
189 idents,msg = self.session.feed_identities(msg, copy=False)
195 idents,msg = self.session.feed_identities(msg, copy=False)
190 try:
196 try:
191 msg = self.session.unpack_message(msg, content=True, copy=False)
197 msg = self.session.unpack_message(msg, content=True, copy=False)
192 except:
198 except:
193 self.log.error("Invalid Message", exc_info=True)
199 self.log.error("Invalid Message", exc_info=True)
194 return
200 return
195
201
196 header = msg['header']
202 header = msg['header']
197 msg_id = header['msg_id']
203 msg_id = header['msg_id']
198
204
199 handler = self.control_handlers.get(msg['msg_type'], None)
205 handler = self.control_handlers.get(msg['msg_type'], None)
200 if handler is None:
206 if handler is None:
201 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
207 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
202 else:
208 else:
203 handler(self.control_stream, idents, msg)
209 handler(self.control_stream, idents, msg)
204
210
205
211
206 #-------------------- queue helpers ------------------------------
212 #-------------------- queue helpers ------------------------------
207
213
208 def check_dependencies(self, dependencies):
214 def check_dependencies(self, dependencies):
209 if not dependencies:
215 if not dependencies:
210 return True
216 return True
211 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
217 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
212 anyorall = dependencies[0]
218 anyorall = dependencies[0]
213 dependencies = dependencies[1]
219 dependencies = dependencies[1]
214 else:
220 else:
215 anyorall = 'all'
221 anyorall = 'all'
216 results = self.client.get_results(dependencies,status_only=True)
222 results = self.client.get_results(dependencies,status_only=True)
217 if results['status'] != 'ok':
223 if results['status'] != 'ok':
218 return False
224 return False
219
225
220 if anyorall == 'any':
226 if anyorall == 'any':
221 if not results['completed']:
227 if not results['completed']:
222 return False
228 return False
223 else:
229 else:
224 if results['pending']:
230 if results['pending']:
225 return False
231 return False
226
232
227 return True
233 return True
228
234
229 def check_aborted(self, msg_id):
235 def check_aborted(self, msg_id):
230 return msg_id in self.aborted
236 return msg_id in self.aborted
231
237
232 #-------------------- queue handlers -----------------------------
238 #-------------------- queue handlers -----------------------------
233
239
234 def clear_request(self, stream, idents, parent):
240 def clear_request(self, stream, idents, parent):
235 """Clear our namespace."""
241 """Clear our namespace."""
236 self.user_ns = {}
242 self.user_ns = {}
237 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
243 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
238 content = dict(status='ok'))
244 content = dict(status='ok'))
239 self._initial_exec_lines()
245 self._initial_exec_lines()
240
246
241 def execute_request(self, stream, ident, parent):
247 def execute_request(self, stream, ident, parent):
242 self.log.debug('execute request %s'%parent)
248 self.log.debug('execute request %s'%parent)
243 try:
249 try:
244 code = parent[u'content'][u'code']
250 code = parent[u'content'][u'code']
245 except:
251 except:
246 self.log.error("Got bad msg: %s"%parent, exc_info=True)
252 self.log.error("Got bad msg: %s"%parent, exc_info=True)
247 return
253 return
248 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
254 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
249 ident='%s.pyin'%self.prefix)
255 ident='%s.pyin'%self.prefix)
250 started = datetime.now().strftime(ISO8601)
256 started = datetime.now().strftime(ISO8601)
251 try:
257 try:
252 comp_code = self.compiler(code, '<zmq-kernel>')
258 comp_code = self.compiler(code, '<zmq-kernel>')
253 # allow for not overriding displayhook
259 # allow for not overriding displayhook
254 if hasattr(sys.displayhook, 'set_parent'):
260 if hasattr(sys.displayhook, 'set_parent'):
255 sys.displayhook.set_parent(parent)
261 sys.displayhook.set_parent(parent)
256 sys.stdout.set_parent(parent)
262 sys.stdout.set_parent(parent)
257 sys.stderr.set_parent(parent)
263 sys.stderr.set_parent(parent)
258 exec comp_code in self.user_ns, self.user_ns
264 exec comp_code in self.user_ns, self.user_ns
259 except:
265 except:
260 exc_content = self._wrap_exception('execute')
266 exc_content = self._wrap_exception('execute')
261 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
267 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
262 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
268 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
263 ident='%s.pyerr'%self.prefix)
269 ident='%s.pyerr'%self.prefix)
264 reply_content = exc_content
270 reply_content = exc_content
265 else:
271 else:
266 reply_content = {'status' : 'ok'}
272 reply_content = {'status' : 'ok'}
267
273
268 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
274 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
269 ident=ident, subheader = dict(started=started))
275 ident=ident, subheader = dict(started=started))
270 self.log.debug(str(reply_msg))
276 self.log.debug(str(reply_msg))
271 if reply_msg['content']['status'] == u'error':
277 if reply_msg['content']['status'] == u'error':
272 self.abort_queues()
278 self.abort_queues()
273
279
274 def complete_request(self, stream, ident, parent):
280 def complete_request(self, stream, ident, parent):
275 matches = {'matches' : self.complete(parent),
281 matches = {'matches' : self.complete(parent),
276 'status' : 'ok'}
282 'status' : 'ok'}
277 completion_msg = self.session.send(stream, 'complete_reply',
283 completion_msg = self.session.send(stream, 'complete_reply',
278 matches, parent, ident)
284 matches, parent, ident)
279 # print >> sys.__stdout__, completion_msg
285 # print >> sys.__stdout__, completion_msg
280
286
281 def complete(self, msg):
287 def complete(self, msg):
282 return self.completer.complete(msg.content.line, msg.content.text)
288 return self.completer.complete(msg.content.line, msg.content.text)
283
289
284 def apply_request(self, stream, ident, parent):
290 def apply_request(self, stream, ident, parent):
285 # flush previous reply, so this request won't block it
291 # flush previous reply, so this request won't block it
286 stream.flush(zmq.POLLOUT)
292 stream.flush(zmq.POLLOUT)
287
293
288 try:
294 try:
289 content = parent[u'content']
295 content = parent[u'content']
290 bufs = parent[u'buffers']
296 bufs = parent[u'buffers']
291 msg_id = parent['header']['msg_id']
297 msg_id = parent['header']['msg_id']
292 bound = content.get('bound', False)
298 bound = content.get('bound', False)
293 except:
299 except:
294 self.log.error("Got bad msg: %s"%parent, exc_info=True)
300 self.log.error("Got bad msg: %s"%parent, exc_info=True)
295 return
301 return
296 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
302 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
297 # self.iopub_stream.send(pyin_msg)
303 # self.iopub_stream.send(pyin_msg)
298 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
304 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
299 sub = {'dependencies_met' : True, 'engine' : self.ident,
305 sub = {'dependencies_met' : True, 'engine' : self.ident,
300 'started': datetime.now().strftime(ISO8601)}
306 'started': datetime.now().strftime(ISO8601)}
301 try:
307 try:
302 # allow for not overriding displayhook
308 # allow for not overriding displayhook
303 if hasattr(sys.displayhook, 'set_parent'):
309 if hasattr(sys.displayhook, 'set_parent'):
304 sys.displayhook.set_parent(parent)
310 sys.displayhook.set_parent(parent)
305 sys.stdout.set_parent(parent)
311 sys.stdout.set_parent(parent)
306 sys.stderr.set_parent(parent)
312 sys.stderr.set_parent(parent)
307 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
313 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
308 working = self.user_ns
314 working = self.user_ns
309 # suffix =
315 # suffix =
310 prefix = "_"+str(msg_id).replace("-","")+"_"
316 prefix = "_"+str(msg_id).replace("-","")+"_"
311 # if bound:
317 # if bound:
312 #
318 #
313 # else:
319 # else:
314 # working = dict()
320 # working = dict()
315 # suffix = prefix = "_" # prevent keyword collisions with lambda
321 # suffix = prefix = "_" # prevent keyword collisions with lambda
316 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
322 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
317 if bound:
323 if bound:
318 bound_ns = Namespace(working)
324 bound_ns = Namespace(working)
319 args = [bound_ns]+list(args)
325 args = [bound_ns]+list(args)
320 # if f.fun
326 # if f.fun
321 fname = getattr(f, '__name__', 'f')
327 fname = getattr(f, '__name__', 'f')
322
328
323 fname = prefix+"f"
329 fname = prefix+"f"
324 argname = prefix+"args"
330 argname = prefix+"args"
325 kwargname = prefix+"kwargs"
331 kwargname = prefix+"kwargs"
326 resultname = prefix+"result"
332 resultname = prefix+"result"
327
333
328 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
334 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
329 # print ns
335 # print ns
330 working.update(ns)
336 working.update(ns)
331 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
337 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
332 try:
338 try:
333 exec code in working,working
339 exec code in working,working
334 result = working.get(resultname)
340 result = working.get(resultname)
335 finally:
341 finally:
336 for key in ns.iterkeys():
342 for key in ns.iterkeys():
337 working.pop(key)
343 working.pop(key)
338 if bound:
344 if bound:
339 working.update(bound_ns)
345 working.update(bound_ns)
340
346
341 packed_result,buf = serialize_object(result)
347 packed_result,buf = serialize_object(result)
342 result_buf = [packed_result]+buf
348 result_buf = [packed_result]+buf
343 except:
349 except:
344 exc_content = self._wrap_exception('apply')
350 exc_content = self._wrap_exception('apply')
345 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
351 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
346 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
352 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
347 ident='%s.pyerr'%self.prefix)
353 ident='%s.pyerr'%self.prefix)
348 reply_content = exc_content
354 reply_content = exc_content
349 result_buf = []
355 result_buf = []
350
356
351 if exc_content['ename'] == 'UnmetDependency':
357 if exc_content['ename'] == 'UnmetDependency':
352 sub['dependencies_met'] = False
358 sub['dependencies_met'] = False
353 else:
359 else:
354 reply_content = {'status' : 'ok'}
360 reply_content = {'status' : 'ok'}
355
361
356 # put 'ok'/'error' status in header, for scheduler introspection:
362 # put 'ok'/'error' status in header, for scheduler introspection:
357 sub['status'] = reply_content['status']
363 sub['status'] = reply_content['status']
358
364
359 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
365 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
360 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
366 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
361
367
362 # if reply_msg['content']['status'] == u'error':
368 # if reply_msg['content']['status'] == u'error':
363 # self.abort_queues()
369 # self.abort_queues()
364
370
365 def dispatch_queue(self, stream, msg):
371 def dispatch_queue(self, stream, msg):
366 self.control_stream.flush()
372 self.control_stream.flush()
367 idents,msg = self.session.feed_identities(msg, copy=False)
373 idents,msg = self.session.feed_identities(msg, copy=False)
368 try:
374 try:
369 msg = self.session.unpack_message(msg, content=True, copy=False)
375 msg = self.session.unpack_message(msg, content=True, copy=False)
370 except:
376 except:
371 self.log.error("Invalid Message", exc_info=True)
377 self.log.error("Invalid Message", exc_info=True)
372 return
378 return
373
379
374
380
375 header = msg['header']
381 header = msg['header']
376 msg_id = header['msg_id']
382 msg_id = header['msg_id']
377 if self.check_aborted(msg_id):
383 if self.check_aborted(msg_id):
378 self.aborted.remove(msg_id)
384 self.aborted.remove(msg_id)
379 # is it safe to assume a msg_id will not be resubmitted?
385 # is it safe to assume a msg_id will not be resubmitted?
380 reply_type = msg['msg_type'].split('_')[0] + '_reply'
386 reply_type = msg['msg_type'].split('_')[0] + '_reply'
381 reply_msg = self.session.send(stream, reply_type,
387 reply_msg = self.session.send(stream, reply_type,
382 content={'status' : 'aborted'}, parent=msg, ident=idents)
388 content={'status' : 'aborted'}, parent=msg, ident=idents)
383 return
389 return
384 handler = self.shell_handlers.get(msg['msg_type'], None)
390 handler = self.shell_handlers.get(msg['msg_type'], None)
385 if handler is None:
391 if handler is None:
386 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
392 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
387 else:
393 else:
388 handler(stream, idents, msg)
394 handler(stream, idents, msg)
389
395
390 def start(self):
396 def start(self):
391 #### stream mode:
397 #### stream mode:
392 if self.control_stream:
398 if self.control_stream:
393 self.control_stream.on_recv(self.dispatch_control, copy=False)
399 self.control_stream.on_recv(self.dispatch_control, copy=False)
394 self.control_stream.on_err(printer)
400 self.control_stream.on_err(printer)
395
401
396 def make_dispatcher(stream):
402 def make_dispatcher(stream):
397 def dispatcher(msg):
403 def dispatcher(msg):
398 return self.dispatch_queue(stream, msg)
404 return self.dispatch_queue(stream, msg)
399 return dispatcher
405 return dispatcher
400
406
401 for s in self.shell_streams:
407 for s in self.shell_streams:
402 s.on_recv(make_dispatcher(s), copy=False)
408 s.on_recv(make_dispatcher(s), copy=False)
403 s.on_err(printer)
409 s.on_err(printer)
404
410
405 if self.iopub_stream:
411 if self.iopub_stream:
406 self.iopub_stream.on_err(printer)
412 self.iopub_stream.on_err(printer)
407
413
408 #### while True mode:
414 #### while True mode:
409 # while True:
415 # while True:
410 # idle = True
416 # idle = True
411 # try:
417 # try:
412 # msg = self.shell_stream.socket.recv_multipart(
418 # msg = self.shell_stream.socket.recv_multipart(
413 # zmq.NOBLOCK, copy=False)
419 # zmq.NOBLOCK, copy=False)
414 # except zmq.ZMQError, e:
420 # except zmq.ZMQError, e:
415 # if e.errno != zmq.EAGAIN:
421 # if e.errno != zmq.EAGAIN:
416 # raise e
422 # raise e
417 # else:
423 # else:
418 # idle=False
424 # idle=False
419 # self.dispatch_queue(self.shell_stream, msg)
425 # self.dispatch_queue(self.shell_stream, msg)
420 #
426 #
421 # if not self.task_stream.empty():
427 # if not self.task_stream.empty():
422 # idle=False
428 # idle=False
423 # msg = self.task_stream.recv_multipart()
429 # msg = self.task_stream.recv_multipart()
424 # self.dispatch_queue(self.task_stream, msg)
430 # self.dispatch_queue(self.task_stream, msg)
425 # if idle:
431 # if idle:
426 # # don't busywait
432 # # don't busywait
427 # time.sleep(1e-3)
433 # time.sleep(1e-3)
428
434
429 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
435 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
430 client_addr=None, loop=None, context=None, key=None,
436 client_addr=None, loop=None, context=None, key=None,
431 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
437 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
432 """NO LONGER IN USE"""
438 """NO LONGER IN USE"""
433 # create loop, context, and session:
439 # create loop, context, and session:
434 if loop is None:
440 if loop is None:
435 loop = ioloop.IOLoop.instance()
441 loop = ioloop.IOLoop.instance()
436 if context is None:
442 if context is None:
437 context = zmq.Context()
443 context = zmq.Context()
438 c = context
444 c = context
439 session = StreamSession(key=key)
445 session = StreamSession(key=key)
440 # print (session.key)
446 # print (session.key)
441 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
447 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
442
448
443 # create Control Stream
449 # create Control Stream
444 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
450 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
445 control_stream.setsockopt(zmq.IDENTITY, identity)
451 control_stream.setsockopt(zmq.IDENTITY, identity)
446 control_stream.connect(control_addr)
452 control_stream.connect(control_addr)
447
453
448 # create Shell Streams (MUX, Task, etc.):
454 # create Shell Streams (MUX, Task, etc.):
449 shell_streams = []
455 shell_streams = []
450 for addr in shell_addrs:
456 for addr in shell_addrs:
451 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
457 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
452 stream.setsockopt(zmq.IDENTITY, identity)
458 stream.setsockopt(zmq.IDENTITY, identity)
453 stream.connect(addr)
459 stream.connect(addr)
454 shell_streams.append(stream)
460 shell_streams.append(stream)
455
461
456 # create iopub stream:
462 # create iopub stream:
457 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
463 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
458 iopub_stream.setsockopt(zmq.IDENTITY, identity)
464 iopub_stream.setsockopt(zmq.IDENTITY, identity)
459 iopub_stream.connect(iopub_addr)
465 iopub_stream.connect(iopub_addr)
460
466
461 # Redirect input streams and set a display hook.
467 # Redirect input streams and set a display hook.
462 if out_stream_factory:
468 if out_stream_factory:
463 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
469 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
464 sys.stdout.topic = 'engine.%i.stdout'%int_id
470 sys.stdout.topic = 'engine.%i.stdout'%int_id
465 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
471 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
466 sys.stderr.topic = 'engine.%i.stderr'%int_id
472 sys.stderr.topic = 'engine.%i.stderr'%int_id
467 if display_hook_factory:
473 if display_hook_factory:
468 sys.displayhook = display_hook_factory(session, iopub_stream)
474 sys.displayhook = display_hook_factory(session, iopub_stream)
469 sys.displayhook.topic = 'engine.%i.pyout'%int_id
475 sys.displayhook.topic = 'engine.%i.pyout'%int_id
470
476
471
477
472 # launch heartbeat
478 # launch heartbeat
473 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
479 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
474 heart.start()
480 heart.start()
475
481
476 # create (optional) Client
482 # create (optional) Client
477 if client_addr:
483 if client_addr:
478 client = Client(client_addr, username=identity)
484 client = Client(client_addr, username=identity)
479 else:
485 else:
480 client = None
486 client = None
481
487
482 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
488 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
483 shell_streams=shell_streams, iopub_stream=iopub_stream,
489 shell_streams=shell_streams, iopub_stream=iopub_stream,
484 client=client, loop=loop)
490 client=client, loop=loop)
485 kernel.start()
491 kernel.start()
486 return loop, c, kernel
492 return loop, c, kernel
487
493
@@ -1,412 +1,418 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
2 """edited session.py to work with streams, and move msg_type to the header
3 """
3 """
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2010-2011 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
4
10
5
11
6 import os
12 import os
7 import pprint
13 import pprint
8 import uuid
14 import uuid
9 from datetime import datetime
15 from datetime import datetime
10
16
11 try:
17 try:
12 import cPickle
18 import cPickle
13 pickle = cPickle
19 pickle = cPickle
14 except:
20 except:
15 cPickle = None
21 cPickle = None
16 import pickle
22 import pickle
17
23
18 import zmq
24 import zmq
19 from zmq.utils import jsonapi
25 from zmq.utils import jsonapi
20 from zmq.eventloop.zmqstream import ZMQStream
26 from zmq.eventloop.zmqstream import ZMQStream
21
27
22 from .util import ISO8601
28 from .util import ISO8601
23
29
24 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
30 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
25 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
31 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
26 if json_name in ('jsonlib', 'jsonlib2'):
32 if json_name in ('jsonlib', 'jsonlib2'):
27 use_json = True
33 use_json = True
28 elif json_name:
34 elif json_name:
29 if cPickle is None:
35 if cPickle is None:
30 use_json = True
36 use_json = True
31 else:
37 else:
32 use_json = False
38 use_json = False
33 else:
39 else:
34 use_json = False
40 use_json = False
35
41
36 def squash_unicode(obj):
42 def squash_unicode(obj):
37 if isinstance(obj,dict):
43 if isinstance(obj,dict):
38 for key in obj.keys():
44 for key in obj.keys():
39 obj[key] = squash_unicode(obj[key])
45 obj[key] = squash_unicode(obj[key])
40 if isinstance(key, unicode):
46 if isinstance(key, unicode):
41 obj[squash_unicode(key)] = obj.pop(key)
47 obj[squash_unicode(key)] = obj.pop(key)
42 elif isinstance(obj, list):
48 elif isinstance(obj, list):
43 for i,v in enumerate(obj):
49 for i,v in enumerate(obj):
44 obj[i] = squash_unicode(v)
50 obj[i] = squash_unicode(v)
45 elif isinstance(obj, unicode):
51 elif isinstance(obj, unicode):
46 obj = obj.encode('utf8')
52 obj = obj.encode('utf8')
47 return obj
53 return obj
48
54
49 json_packer = jsonapi.dumps
55 json_packer = jsonapi.dumps
50 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
56 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
51
57
52 pickle_packer = lambda o: pickle.dumps(o,-1)
58 pickle_packer = lambda o: pickle.dumps(o,-1)
53 pickle_unpacker = pickle.loads
59 pickle_unpacker = pickle.loads
54
60
55 if use_json:
61 if use_json:
56 default_packer = json_packer
62 default_packer = json_packer
57 default_unpacker = json_unpacker
63 default_unpacker = json_unpacker
58 else:
64 else:
59 default_packer = pickle_packer
65 default_packer = pickle_packer
60 default_unpacker = pickle_unpacker
66 default_unpacker = pickle_unpacker
61
67
62
68
63 DELIM="<IDS|MSG>"
69 DELIM="<IDS|MSG>"
64
70
65 class Message(object):
71 class Message(object):
66 """A simple message object that maps dict keys to attributes.
72 """A simple message object that maps dict keys to attributes.
67
73
68 A Message can be created from a dict and a dict from a Message instance
74 A Message can be created from a dict and a dict from a Message instance
69 simply by calling dict(msg_obj)."""
75 simply by calling dict(msg_obj)."""
70
76
71 def __init__(self, msg_dict):
77 def __init__(self, msg_dict):
72 dct = self.__dict__
78 dct = self.__dict__
73 for k, v in dict(msg_dict).iteritems():
79 for k, v in dict(msg_dict).iteritems():
74 if isinstance(v, dict):
80 if isinstance(v, dict):
75 v = Message(v)
81 v = Message(v)
76 dct[k] = v
82 dct[k] = v
77
83
78 # Having this iterator lets dict(msg_obj) work out of the box.
84 # Having this iterator lets dict(msg_obj) work out of the box.
79 def __iter__(self):
85 def __iter__(self):
80 return iter(self.__dict__.iteritems())
86 return iter(self.__dict__.iteritems())
81
87
82 def __repr__(self):
88 def __repr__(self):
83 return repr(self.__dict__)
89 return repr(self.__dict__)
84
90
85 def __str__(self):
91 def __str__(self):
86 return pprint.pformat(self.__dict__)
92 return pprint.pformat(self.__dict__)
87
93
88 def __contains__(self, k):
94 def __contains__(self, k):
89 return k in self.__dict__
95 return k in self.__dict__
90
96
91 def __getitem__(self, k):
97 def __getitem__(self, k):
92 return self.__dict__[k]
98 return self.__dict__[k]
93
99
94
100
95 def msg_header(msg_id, msg_type, username, session):
101 def msg_header(msg_id, msg_type, username, session):
96 date=datetime.now().strftime(ISO8601)
102 date=datetime.now().strftime(ISO8601)
97 return locals()
103 return locals()
98
104
99 def extract_header(msg_or_header):
105 def extract_header(msg_or_header):
100 """Given a message or header, return the header."""
106 """Given a message or header, return the header."""
101 if not msg_or_header:
107 if not msg_or_header:
102 return {}
108 return {}
103 try:
109 try:
104 # See if msg_or_header is the entire message.
110 # See if msg_or_header is the entire message.
105 h = msg_or_header['header']
111 h = msg_or_header['header']
106 except KeyError:
112 except KeyError:
107 try:
113 try:
108 # See if msg_or_header is just the header
114 # See if msg_or_header is just the header
109 h = msg_or_header['msg_id']
115 h = msg_or_header['msg_id']
110 except KeyError:
116 except KeyError:
111 raise
117 raise
112 else:
118 else:
113 h = msg_or_header
119 h = msg_or_header
114 if not isinstance(h, dict):
120 if not isinstance(h, dict):
115 h = dict(h)
121 h = dict(h)
116 return h
122 return h
117
123
118 class StreamSession(object):
124 class StreamSession(object):
119 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
125 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
120 debug=False
126 debug=False
121 key=None
127 key=None
122
128
123 def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None):
129 def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None):
124 if username is None:
130 if username is None:
125 username = os.environ.get('USER','username')
131 username = os.environ.get('USER','username')
126 self.username = username
132 self.username = username
127 if session is None:
133 if session is None:
128 self.session = str(uuid.uuid4())
134 self.session = str(uuid.uuid4())
129 else:
135 else:
130 self.session = session
136 self.session = session
131 self.msg_id = str(uuid.uuid4())
137 self.msg_id = str(uuid.uuid4())
132 if packer is None:
138 if packer is None:
133 self.pack = default_packer
139 self.pack = default_packer
134 else:
140 else:
135 if not callable(packer):
141 if not callable(packer):
136 raise TypeError("packer must be callable, not %s"%type(packer))
142 raise TypeError("packer must be callable, not %s"%type(packer))
137 self.pack = packer
143 self.pack = packer
138
144
139 if unpacker is None:
145 if unpacker is None:
140 self.unpack = default_unpacker
146 self.unpack = default_unpacker
141 else:
147 else:
142 if not callable(unpacker):
148 if not callable(unpacker):
143 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
149 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
144 self.unpack = unpacker
150 self.unpack = unpacker
145
151
146 if key is not None and keyfile is not None:
152 if key is not None and keyfile is not None:
147 raise TypeError("Must specify key OR keyfile, not both")
153 raise TypeError("Must specify key OR keyfile, not both")
148 if keyfile is not None:
154 if keyfile is not None:
149 with open(keyfile) as f:
155 with open(keyfile) as f:
150 self.key = f.read().strip()
156 self.key = f.read().strip()
151 else:
157 else:
152 self.key = key
158 self.key = key
153 if isinstance(self.key, unicode):
159 if isinstance(self.key, unicode):
154 self.key = self.key.encode('utf8')
160 self.key = self.key.encode('utf8')
155 # print key, keyfile, self.key
161 # print key, keyfile, self.key
156 self.none = self.pack({})
162 self.none = self.pack({})
157
163
158 def msg_header(self, msg_type):
164 def msg_header(self, msg_type):
159 h = msg_header(self.msg_id, msg_type, self.username, self.session)
165 h = msg_header(self.msg_id, msg_type, self.username, self.session)
160 self.msg_id = str(uuid.uuid4())
166 self.msg_id = str(uuid.uuid4())
161 return h
167 return h
162
168
163 def msg(self, msg_type, content=None, parent=None, subheader=None):
169 def msg(self, msg_type, content=None, parent=None, subheader=None):
164 msg = {}
170 msg = {}
165 msg['header'] = self.msg_header(msg_type)
171 msg['header'] = self.msg_header(msg_type)
166 msg['msg_id'] = msg['header']['msg_id']
172 msg['msg_id'] = msg['header']['msg_id']
167 msg['parent_header'] = {} if parent is None else extract_header(parent)
173 msg['parent_header'] = {} if parent is None else extract_header(parent)
168 msg['msg_type'] = msg_type
174 msg['msg_type'] = msg_type
169 msg['content'] = {} if content is None else content
175 msg['content'] = {} if content is None else content
170 sub = {} if subheader is None else subheader
176 sub = {} if subheader is None else subheader
171 msg['header'].update(sub)
177 msg['header'].update(sub)
172 return msg
178 return msg
173
179
174 def check_key(self, msg_or_header):
180 def check_key(self, msg_or_header):
175 """Check that a message's header has the right key"""
181 """Check that a message's header has the right key"""
176 if self.key is None:
182 if self.key is None:
177 return True
183 return True
178 header = extract_header(msg_or_header)
184 header = extract_header(msg_or_header)
179 return header.get('key', None) == self.key
185 return header.get('key', None) == self.key
180
186
181
187
182 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False):
188 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False):
183 """Build and send a message via stream or socket.
189 """Build and send a message via stream or socket.
184
190
185 Parameters
191 Parameters
186 ----------
192 ----------
187
193
188 stream : zmq.Socket or ZMQStream
194 stream : zmq.Socket or ZMQStream
189 the socket-like object used to send the data
195 the socket-like object used to send the data
190 msg_or_type : str or Message/dict
196 msg_or_type : str or Message/dict
191 Normally, msg_or_type will be a msg_type unless a message is being sent more
197 Normally, msg_or_type will be a msg_type unless a message is being sent more
192 than once.
198 than once.
193
199
194 content : dict or None
200 content : dict or None
195 the content of the message (ignored if msg_or_type is a message)
201 the content of the message (ignored if msg_or_type is a message)
196 buffers : list or None
202 buffers : list or None
197 the already-serialized buffers to be appended to the message
203 the already-serialized buffers to be appended to the message
198 parent : Message or dict or None
204 parent : Message or dict or None
199 the parent or parent header describing the parent of this message
205 the parent or parent header describing the parent of this message
200 subheader : dict or None
206 subheader : dict or None
201 extra header keys for this message's header
207 extra header keys for this message's header
202 ident : bytes or list of bytes
208 ident : bytes or list of bytes
203 the zmq.IDENTITY routing path
209 the zmq.IDENTITY routing path
204 track : bool
210 track : bool
205 whether to track. Only for use with Sockets, because ZMQStream objects cannot track messages.
211 whether to track. Only for use with Sockets, because ZMQStream objects cannot track messages.
206
212
207 Returns
213 Returns
208 -------
214 -------
209 msg : message dict
215 msg : message dict
210 the constructed message
216 the constructed message
211 (msg,tracker) : (message dict, MessageTracker)
217 (msg,tracker) : (message dict, MessageTracker)
212 if track=True, then a 2-tuple will be returned, the first element being the constructed
218 if track=True, then a 2-tuple will be returned, the first element being the constructed
213 message, and the second being the MessageTracker
219 message, and the second being the MessageTracker
214
220
215 """
221 """
216
222
217 if not isinstance(stream, (zmq.Socket, ZMQStream)):
223 if not isinstance(stream, (zmq.Socket, ZMQStream)):
218 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
224 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
219 elif track and isinstance(stream, ZMQStream):
225 elif track and isinstance(stream, ZMQStream):
220 raise TypeError("ZMQStream cannot track messages")
226 raise TypeError("ZMQStream cannot track messages")
221
227
222 if isinstance(msg_or_type, (Message, dict)):
228 if isinstance(msg_or_type, (Message, dict)):
223 # we got a Message, not a msg_type
229 # we got a Message, not a msg_type
224 # don't build a new Message
230 # don't build a new Message
225 msg = msg_or_type
231 msg = msg_or_type
226 content = msg['content']
232 content = msg['content']
227 else:
233 else:
228 msg = self.msg(msg_or_type, content, parent, subheader)
234 msg = self.msg(msg_or_type, content, parent, subheader)
229
235
230 buffers = [] if buffers is None else buffers
236 buffers = [] if buffers is None else buffers
231 to_send = []
237 to_send = []
232 if isinstance(ident, list):
238 if isinstance(ident, list):
233 # accept list of idents
239 # accept list of idents
234 to_send.extend(ident)
240 to_send.extend(ident)
235 elif ident is not None:
241 elif ident is not None:
236 to_send.append(ident)
242 to_send.append(ident)
237 to_send.append(DELIM)
243 to_send.append(DELIM)
238 if self.key is not None:
244 if self.key is not None:
239 to_send.append(self.key)
245 to_send.append(self.key)
240 to_send.append(self.pack(msg['header']))
246 to_send.append(self.pack(msg['header']))
241 to_send.append(self.pack(msg['parent_header']))
247 to_send.append(self.pack(msg['parent_header']))
242
248
243 if content is None:
249 if content is None:
244 content = self.none
250 content = self.none
245 elif isinstance(content, dict):
251 elif isinstance(content, dict):
246 content = self.pack(content)
252 content = self.pack(content)
247 elif isinstance(content, bytes):
253 elif isinstance(content, bytes):
248 # content is already packed, as in a relayed message
254 # content is already packed, as in a relayed message
249 pass
255 pass
250 else:
256 else:
251 raise TypeError("Content incorrect type: %s"%type(content))
257 raise TypeError("Content incorrect type: %s"%type(content))
252 to_send.append(content)
258 to_send.append(content)
253 flag = 0
259 flag = 0
254 if buffers:
260 if buffers:
255 flag = zmq.SNDMORE
261 flag = zmq.SNDMORE
256 _track = False
262 _track = False
257 else:
263 else:
258 _track=track
264 _track=track
259 if track:
265 if track:
260 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
266 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
261 else:
267 else:
262 tracker = stream.send_multipart(to_send, flag, copy=False)
268 tracker = stream.send_multipart(to_send, flag, copy=False)
263 for b in buffers[:-1]:
269 for b in buffers[:-1]:
264 stream.send(b, flag, copy=False)
270 stream.send(b, flag, copy=False)
265 if buffers:
271 if buffers:
266 if track:
272 if track:
267 tracker = stream.send(buffers[-1], copy=False, track=track)
273 tracker = stream.send(buffers[-1], copy=False, track=track)
268 else:
274 else:
269 tracker = stream.send(buffers[-1], copy=False)
275 tracker = stream.send(buffers[-1], copy=False)
270
276
271 # omsg = Message(msg)
277 # omsg = Message(msg)
272 if self.debug:
278 if self.debug:
273 pprint.pprint(msg)
279 pprint.pprint(msg)
274 pprint.pprint(to_send)
280 pprint.pprint(to_send)
275 pprint.pprint(buffers)
281 pprint.pprint(buffers)
276
282
277 msg['tracker'] = tracker
283 msg['tracker'] = tracker
278
284
279 return msg
285 return msg
280
286
281 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
287 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
282 """Send a raw message via ident path.
288 """Send a raw message via ident path.
283
289
284 Parameters
290 Parameters
285 ----------
291 ----------
286 msg : list of sendable buffers"""
292 msg : list of sendable buffers"""
287 to_send = []
293 to_send = []
288 if isinstance(ident, bytes):
294 if isinstance(ident, bytes):
289 ident = [ident]
295 ident = [ident]
290 if ident is not None:
296 if ident is not None:
291 to_send.extend(ident)
297 to_send.extend(ident)
292 to_send.append(DELIM)
298 to_send.append(DELIM)
293 if self.key is not None:
299 if self.key is not None:
294 to_send.append(self.key)
300 to_send.append(self.key)
295 to_send.extend(msg)
301 to_send.extend(msg)
296 stream.send_multipart(msg, flags, copy=copy)
302 stream.send_multipart(msg, flags, copy=copy)
297
303
298 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
304 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
299 """receives and unpacks a message
305 """receives and unpacks a message
300 returns [idents], msg"""
306 returns [idents], msg"""
301 if isinstance(socket, ZMQStream):
307 if isinstance(socket, ZMQStream):
302 socket = socket.socket
308 socket = socket.socket
303 try:
309 try:
304 msg = socket.recv_multipart(mode)
310 msg = socket.recv_multipart(mode)
305 except zmq.ZMQError as e:
311 except zmq.ZMQError as e:
306 if e.errno == zmq.EAGAIN:
312 if e.errno == zmq.EAGAIN:
307 # We can convert EAGAIN to None as we know in this case
313 # We can convert EAGAIN to None as we know in this case
308 # recv_multipart won't return None.
314 # recv_multipart won't return None.
309 return None
315 return None
310 else:
316 else:
311 raise
317 raise
312 # return an actual Message object
318 # return an actual Message object
313 # determine the number of idents by trying to unpack them.
319 # determine the number of idents by trying to unpack them.
314 # this is terrible:
320 # this is terrible:
315 idents, msg = self.feed_identities(msg, copy)
321 idents, msg = self.feed_identities(msg, copy)
316 try:
322 try:
317 return idents, self.unpack_message(msg, content=content, copy=copy)
323 return idents, self.unpack_message(msg, content=content, copy=copy)
318 except Exception as e:
324 except Exception as e:
319 print (idents, msg)
325 print (idents, msg)
320 # TODO: handle it
326 # TODO: handle it
321 raise e
327 raise e
322
328
323 def feed_identities(self, msg, copy=True):
329 def feed_identities(self, msg, copy=True):
324 """feed until DELIM is reached, then return the prefix as idents and remainder as
330 """feed until DELIM is reached, then return the prefix as idents and remainder as
325 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
331 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
326
332
327 Parameters
333 Parameters
328 ----------
334 ----------
329 msg : a list of Message or bytes objects
335 msg : a list of Message or bytes objects
330 the message to be split
336 the message to be split
331 copy : bool
337 copy : bool
332 flag determining whether the arguments are bytes or Messages
338 flag determining whether the arguments are bytes or Messages
333
339
334 Returns
340 Returns
335 -------
341 -------
336 (idents,msg) : two lists
342 (idents,msg) : two lists
337 idents will always be a list of bytes - the indentity prefix
343 idents will always be a list of bytes - the indentity prefix
338 msg will be a list of bytes or Messages, unchanged from input
344 msg will be a list of bytes or Messages, unchanged from input
339 msg should be unpackable via self.unpack_message at this point.
345 msg should be unpackable via self.unpack_message at this point.
340 """
346 """
341 ikey = int(self.key is not None)
347 ikey = int(self.key is not None)
342 minlen = 3 + ikey
348 minlen = 3 + ikey
343 msg = list(msg)
349 msg = list(msg)
344 idents = []
350 idents = []
345 while len(msg) > minlen:
351 while len(msg) > minlen:
346 if copy:
352 if copy:
347 s = msg[0]
353 s = msg[0]
348 else:
354 else:
349 s = msg[0].bytes
355 s = msg[0].bytes
350 if s == DELIM:
356 if s == DELIM:
351 msg.pop(0)
357 msg.pop(0)
352 break
358 break
353 else:
359 else:
354 idents.append(s)
360 idents.append(s)
355 msg.pop(0)
361 msg.pop(0)
356
362
357 return idents, msg
363 return idents, msg
358
364
359 def unpack_message(self, msg, content=True, copy=True):
365 def unpack_message(self, msg, content=True, copy=True):
360 """Return a message object from the format
366 """Return a message object from the format
361 sent by self.send.
367 sent by self.send.
362
368
363 Parameters:
369 Parameters:
364 -----------
370 -----------
365
371
366 content : bool (True)
372 content : bool (True)
367 whether to unpack the content dict (True),
373 whether to unpack the content dict (True),
368 or leave it serialized (False)
374 or leave it serialized (False)
369
375
370 copy : bool (True)
376 copy : bool (True)
371 whether to return the bytes (True),
377 whether to return the bytes (True),
372 or the non-copying Message object in each place (False)
378 or the non-copying Message object in each place (False)
373
379
374 """
380 """
375 ikey = int(self.key is not None)
381 ikey = int(self.key is not None)
376 minlen = 3 + ikey
382 minlen = 3 + ikey
377 message = {}
383 message = {}
378 if not copy:
384 if not copy:
379 for i in range(minlen):
385 for i in range(minlen):
380 msg[i] = msg[i].bytes
386 msg[i] = msg[i].bytes
381 if ikey:
387 if ikey:
382 if not self.key == msg[0]:
388 if not self.key == msg[0]:
383 raise KeyError("Invalid Session Key: %s"%msg[0])
389 raise KeyError("Invalid Session Key: %s"%msg[0])
384 if not len(msg) >= minlen:
390 if not len(msg) >= minlen:
385 raise TypeError("malformed message, must have at least %i elements"%minlen)
391 raise TypeError("malformed message, must have at least %i elements"%minlen)
386 message['header'] = self.unpack(msg[ikey+0])
392 message['header'] = self.unpack(msg[ikey+0])
387 message['msg_type'] = message['header']['msg_type']
393 message['msg_type'] = message['header']['msg_type']
388 message['parent_header'] = self.unpack(msg[ikey+1])
394 message['parent_header'] = self.unpack(msg[ikey+1])
389 if content:
395 if content:
390 message['content'] = self.unpack(msg[ikey+2])
396 message['content'] = self.unpack(msg[ikey+2])
391 else:
397 else:
392 message['content'] = msg[ikey+2]
398 message['content'] = msg[ikey+2]
393
399
394 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
400 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
395 return message
401 return message
396
402
397
403
398 def test_msg2obj():
404 def test_msg2obj():
399 am = dict(x=1)
405 am = dict(x=1)
400 ao = Message(am)
406 ao = Message(am)
401 assert ao.x == am['x']
407 assert ao.x == am['x']
402
408
403 am['y'] = dict(z=1)
409 am['y'] = dict(z=1)
404 ao = Message(am)
410 ao = Message(am)
405 assert ao.y.z == am['y']['z']
411 assert ao.y.z == am['y']['z']
406
412
407 k1, k2 = 'y', 'z'
413 k1, k2 = 'y', 'z'
408 assert ao[k1][k2] == am[k1][k2]
414 assert ao[k1][k2] == am[k1][k2]
409
415
410 am2 = dict(ao)
416 am2 = dict(ao)
411 assert am['x'] == am2['x']
417 assert am['x'] == am2['x']
412 assert am['y']['z'] == am2['y']['z']
418 assert am['y']['z'] == am2['y']['z']
@@ -1,100 +1,106 b''
1 """Thread for popping Tasks from zmq to Python Queue"""
1 """Thread for popping Tasks from zmq to Python Queue"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010-2011 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
2
8
3
9
4 import time
10 import time
5 from threading import Thread
11 from threading import Thread
6
12
7 try:
13 try:
8 from queue import Queue
14 from queue import Queue
9 except:
15 except:
10 from Queue import Queue
16 from Queue import Queue
11
17
12 import zmq
18 import zmq
13 from zmq.core.poll import _poll as poll
19 from zmq.core.poll import _poll as poll
14 from zmq.devices import ThreadDevice
20 from zmq.devices import ThreadDevice
15 from IPython.zmq.parallel import streamsession as ss
21 from IPython.zmq.parallel import streamsession as ss
16
22
17
23
18 class QueueStream(object):
24 class QueueStream(object):
19 def __init__(self, in_queue, out_queue):
25 def __init__(self, in_queue, out_queue):
20 self.in_queue = in_queue
26 self.in_queue = in_queue
21 self.out_queue = out_queue
27 self.out_queue = out_queue
22
28
23 def send_multipart(self, *args, **kwargs):
29 def send_multipart(self, *args, **kwargs):
24 while self.out_queue.full():
30 while self.out_queue.full():
25 time.sleep(1e-3)
31 time.sleep(1e-3)
26 self.out_queue.put(('send_multipart', args, kwargs))
32 self.out_queue.put(('send_multipart', args, kwargs))
27
33
28 def send(self, *args, **kwargs):
34 def send(self, *args, **kwargs):
29 while self.out_queue.full():
35 while self.out_queue.full():
30 time.sleep(1e-3)
36 time.sleep(1e-3)
31 self.out_queue.put(('send', args, kwargs))
37 self.out_queue.put(('send', args, kwargs))
32
38
33 def recv_multipart(self):
39 def recv_multipart(self):
34 return self.in_queue.get()
40 return self.in_queue.get()
35
41
36 def empty(self):
42 def empty(self):
37 return self.in_queue.empty()
43 return self.in_queue.empty()
38
44
39 class TaskThread(ThreadDevice):
45 class TaskThread(ThreadDevice):
40 """Class for popping Tasks from C-ZMQ->Python Queue"""
46 """Class for popping Tasks from C-ZMQ->Python Queue"""
41 max_qsize = 100
47 max_qsize = 100
42 in_socket = None
48 in_socket = None
43 out_socket = None
49 out_socket = None
44 # queue = None
50 # queue = None
45
51
46 def __init__(self, queue_type, mon_type, engine_id, max_qsize=100):
52 def __init__(self, queue_type, mon_type, engine_id, max_qsize=100):
47 ThreadDevice.__init__(self, 0, queue_type, mon_type)
53 ThreadDevice.__init__(self, 0, queue_type, mon_type)
48 self.session = ss.StreamSession(username='TaskNotifier[%s]'%engine_id)
54 self.session = ss.StreamSession(username='TaskNotifier[%s]'%engine_id)
49 self.engine_id = engine_id
55 self.engine_id = engine_id
50 self.in_queue = Queue(max_qsize)
56 self.in_queue = Queue(max_qsize)
51 self.out_queue = Queue(max_qsize)
57 self.out_queue = Queue(max_qsize)
52 self.max_qsize = max_qsize
58 self.max_qsize = max_qsize
53
59
54 @property
60 @property
55 def queues(self):
61 def queues(self):
56 return self.in_queue, self.out_queue
62 return self.in_queue, self.out_queue
57
63
58 @property
64 @property
59 def can_recv(self):
65 def can_recv(self):
60 # print self.in_queue.full(), poll((self.queue_socket, zmq.POLLIN),1e-3)
66 # print self.in_queue.full(), poll((self.queue_socket, zmq.POLLIN),1e-3)
61 return (not self.in_queue.full()) and poll([(self.queue_socket, zmq.POLLIN)], 1e-3 )
67 return (not self.in_queue.full()) and poll([(self.queue_socket, zmq.POLLIN)], 1e-3 )
62
68
63 @property
69 @property
64 def can_send(self):
70 def can_send(self):
65 return not self.out_queue.empty()
71 return not self.out_queue.empty()
66
72
67 def run(self):
73 def run(self):
68 print 'running'
74 print 'running'
69 self.queue_socket,self.mon_socket = self._setup_sockets()
75 self.queue_socket,self.mon_socket = self._setup_sockets()
70 print 'setup'
76 print 'setup'
71
77
72 while True:
78 while True:
73 while not self.can_send and not self.can_recv:
79 while not self.can_send and not self.can_recv:
74 # print 'idle'
80 # print 'idle'
75 # nothing to do, wait
81 # nothing to do, wait
76 time.sleep(1e-3)
82 time.sleep(1e-3)
77 while self.can_send:
83 while self.can_send:
78 # flush out queue
84 # flush out queue
79 print 'flushing...'
85 print 'flushing...'
80 meth, args, kwargs = self.out_queue.get()
86 meth, args, kwargs = self.out_queue.get()
81 getattr(self.queue_socket, meth)(*args, **kwargs)
87 getattr(self.queue_socket, meth)(*args, **kwargs)
82 print 'flushed'
88 print 'flushed'
83
89
84 if self.can_recv:
90 if self.can_recv:
85 print 'recving'
91 print 'recving'
86 # get another job from zmq
92 # get another job from zmq
87 msg = self.queue_socket.recv_multipart(0, copy=False)
93 msg = self.queue_socket.recv_multipart(0, copy=False)
88 # put it in the Queue
94 # put it in the Queue
89 self.in_queue.put(msg)
95 self.in_queue.put(msg)
90 idents,msg = self.session.feed_identities(msg, copy=False)
96 idents,msg = self.session.feed_identities(msg, copy=False)
91 msg = self.session.unpack_message(msg, content=False, copy=False)
97 msg = self.session.unpack_message(msg, content=False, copy=False)
92 # notify the Controller that we got it
98 # notify the Controller that we got it
93 self.mon_socket.send('tracktask', zmq.SNDMORE)
99 self.mon_socket.send('tracktask', zmq.SNDMORE)
94 header = msg['header']
100 header = msg['header']
95 msg_id = header['msg_id']
101 msg_id = header['msg_id']
96 content = dict(engine_id=self.engine_id, msg_id = msg_id)
102 content = dict(engine_id=self.engine_id, msg_id = msg_id)
97 self.session.send(self.mon_socket, 'task_receipt', content=content)
103 self.session.send(self.mon_socket, 'task_receipt', content=content)
98 print 'recvd'
104 print 'recvd'
99
105
100 No newline at end of file
106
@@ -1,299 +1,318 b''
1 """some generic utilities for dealing with classes, urls, and serialization"""
1 """some generic utilities for dealing with classes, urls, and serialization"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010-2011 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
8
9 #-----------------------------------------------------------------------------
10 # Imports
11 #-----------------------------------------------------------------------------
12
2 import re
13 import re
3 import socket
14 import socket
4
15
5 try:
16 try:
6 import cPickle
17 import cPickle
7 pickle = cPickle
18 pickle = cPickle
8 except:
19 except:
9 cPickle = None
20 cPickle = None
10 import pickle
21 import pickle
11
22
12
23
13 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
24 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
14 from IPython.utils.newserialized import serialize, unserialize
25 from IPython.utils.newserialized import serialize, unserialize
15
26
16 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
27 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
17
28
29 #-----------------------------------------------------------------------------
30 # Classes
31 #-----------------------------------------------------------------------------
32
18 class Namespace(dict):
33 class Namespace(dict):
19 """Subclass of dict for attribute access to keys."""
34 """Subclass of dict for attribute access to keys."""
20
35
21 def __getattr__(self, key):
36 def __getattr__(self, key):
22 """getattr aliased to getitem"""
37 """getattr aliased to getitem"""
23 if key in self.iterkeys():
38 if key in self.iterkeys():
24 return self[key]
39 return self[key]
25 else:
40 else:
26 raise NameError(key)
41 raise NameError(key)
27
42
28 def __setattr__(self, key, value):
43 def __setattr__(self, key, value):
29 """setattr aliased to setitem, with strict"""
44 """setattr aliased to setitem, with strict"""
30 if hasattr(dict, key):
45 if hasattr(dict, key):
31 raise KeyError("Cannot override dict keys %r"%key)
46 raise KeyError("Cannot override dict keys %r"%key)
32 self[key] = value
47 self[key] = value
33
48
34
49
35 class ReverseDict(dict):
50 class ReverseDict(dict):
36 """simple double-keyed subset of dict methods."""
51 """simple double-keyed subset of dict methods."""
37
52
38 def __init__(self, *args, **kwargs):
53 def __init__(self, *args, **kwargs):
39 dict.__init__(self, *args, **kwargs)
54 dict.__init__(self, *args, **kwargs)
40 self._reverse = dict()
55 self._reverse = dict()
41 for key, value in self.iteritems():
56 for key, value in self.iteritems():
42 self._reverse[value] = key
57 self._reverse[value] = key
43
58
44 def __getitem__(self, key):
59 def __getitem__(self, key):
45 try:
60 try:
46 return dict.__getitem__(self, key)
61 return dict.__getitem__(self, key)
47 except KeyError:
62 except KeyError:
48 return self._reverse[key]
63 return self._reverse[key]
49
64
50 def __setitem__(self, key, value):
65 def __setitem__(self, key, value):
51 if key in self._reverse:
66 if key in self._reverse:
52 raise KeyError("Can't have key %r on both sides!"%key)
67 raise KeyError("Can't have key %r on both sides!"%key)
53 dict.__setitem__(self, key, value)
68 dict.__setitem__(self, key, value)
54 self._reverse[value] = key
69 self._reverse[value] = key
55
70
56 def pop(self, key):
71 def pop(self, key):
57 value = dict.pop(self, key)
72 value = dict.pop(self, key)
58 self._reverse.pop(value)
73 self._reverse.pop(value)
59 return value
74 return value
60
75
61 def get(self, key, default=None):
76 def get(self, key, default=None):
62 try:
77 try:
63 return self[key]
78 return self[key]
64 except KeyError:
79 except KeyError:
65 return default
80 return default
66
81
82 #-----------------------------------------------------------------------------
83 # Functions
84 #-----------------------------------------------------------------------------
85
67 def validate_url(url):
86 def validate_url(url):
68 """validate a url for zeromq"""
87 """validate a url for zeromq"""
69 if not isinstance(url, basestring):
88 if not isinstance(url, basestring):
70 raise TypeError("url must be a string, not %r"%type(url))
89 raise TypeError("url must be a string, not %r"%type(url))
71 url = url.lower()
90 url = url.lower()
72
91
73 proto_addr = url.split('://')
92 proto_addr = url.split('://')
74 assert len(proto_addr) == 2, 'Invalid url: %r'%url
93 assert len(proto_addr) == 2, 'Invalid url: %r'%url
75 proto, addr = proto_addr
94 proto, addr = proto_addr
76 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
95 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
77
96
78 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
97 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
79 # author: Remi Sabourin
98 # author: Remi Sabourin
80 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
99 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
81
100
82 if proto == 'tcp':
101 if proto == 'tcp':
83 lis = addr.split(':')
102 lis = addr.split(':')
84 assert len(lis) == 2, 'Invalid url: %r'%url
103 assert len(lis) == 2, 'Invalid url: %r'%url
85 addr,s_port = lis
104 addr,s_port = lis
86 try:
105 try:
87 port = int(s_port)
106 port = int(s_port)
88 except ValueError:
107 except ValueError:
89 raise AssertionError("Invalid port %r in url: %r"%(port, url))
108 raise AssertionError("Invalid port %r in url: %r"%(port, url))
90
109
91 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
110 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
92
111
93 else:
112 else:
94 # only validate tcp urls currently
113 # only validate tcp urls currently
95 pass
114 pass
96
115
97 return True
116 return True
98
117
99
118
100 def validate_url_container(container):
119 def validate_url_container(container):
101 """validate a potentially nested collection of urls."""
120 """validate a potentially nested collection of urls."""
102 if isinstance(container, basestring):
121 if isinstance(container, basestring):
103 url = container
122 url = container
104 return validate_url(url)
123 return validate_url(url)
105 elif isinstance(container, dict):
124 elif isinstance(container, dict):
106 container = container.itervalues()
125 container = container.itervalues()
107
126
108 for element in container:
127 for element in container:
109 validate_url_container(element)
128 validate_url_container(element)
110
129
111
130
112 def split_url(url):
131 def split_url(url):
113 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
132 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
114 proto_addr = url.split('://')
133 proto_addr = url.split('://')
115 assert len(proto_addr) == 2, 'Invalid url: %r'%url
134 assert len(proto_addr) == 2, 'Invalid url: %r'%url
116 proto, addr = proto_addr
135 proto, addr = proto_addr
117 lis = addr.split(':')
136 lis = addr.split(':')
118 assert len(lis) == 2, 'Invalid url: %r'%url
137 assert len(lis) == 2, 'Invalid url: %r'%url
119 addr,s_port = lis
138 addr,s_port = lis
120 return proto,addr,s_port
139 return proto,addr,s_port
121
140
122 def disambiguate_ip_address(ip, location=None):
141 def disambiguate_ip_address(ip, location=None):
123 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
142 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
124 ones, based on the location (default interpretation of location is localhost)."""
143 ones, based on the location (default interpretation of location is localhost)."""
125 if ip in ('0.0.0.0', '*'):
144 if ip in ('0.0.0.0', '*'):
126 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
145 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
127 if location is None or location in external_ips:
146 if location is None or location in external_ips:
128 ip='127.0.0.1'
147 ip='127.0.0.1'
129 elif location:
148 elif location:
130 return location
149 return location
131 return ip
150 return ip
132
151
133 def disambiguate_url(url, location=None):
152 def disambiguate_url(url, location=None):
134 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
153 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
135 ones, based on the location (default interpretation is localhost).
154 ones, based on the location (default interpretation is localhost).
136
155
137 This is for zeromq urls, such as tcp://*:10101."""
156 This is for zeromq urls, such as tcp://*:10101."""
138 try:
157 try:
139 proto,ip,port = split_url(url)
158 proto,ip,port = split_url(url)
140 except AssertionError:
159 except AssertionError:
141 # probably not tcp url; could be ipc, etc.
160 # probably not tcp url; could be ipc, etc.
142 return url
161 return url
143
162
144 ip = disambiguate_ip_address(ip,location)
163 ip = disambiguate_ip_address(ip,location)
145
164
146 return "%s://%s:%s"%(proto,ip,port)
165 return "%s://%s:%s"%(proto,ip,port)
147
166
148
167
149 def rekey(dikt):
168 def rekey(dikt):
150 """Rekey a dict that has been forced to use str keys where there should be
169 """Rekey a dict that has been forced to use str keys where there should be
151 ints by json. This belongs in the jsonutil added by fperez."""
170 ints by json. This belongs in the jsonutil added by fperez."""
152 for k in dikt.iterkeys():
171 for k in dikt.iterkeys():
153 if isinstance(k, str):
172 if isinstance(k, str):
154 ik=fk=None
173 ik=fk=None
155 try:
174 try:
156 ik = int(k)
175 ik = int(k)
157 except ValueError:
176 except ValueError:
158 try:
177 try:
159 fk = float(k)
178 fk = float(k)
160 except ValueError:
179 except ValueError:
161 continue
180 continue
162 if ik is not None:
181 if ik is not None:
163 nk = ik
182 nk = ik
164 else:
183 else:
165 nk = fk
184 nk = fk
166 if nk in dikt:
185 if nk in dikt:
167 raise KeyError("already have key %r"%nk)
186 raise KeyError("already have key %r"%nk)
168 dikt[nk] = dikt.pop(k)
187 dikt[nk] = dikt.pop(k)
169 return dikt
188 return dikt
170
189
171 def serialize_object(obj, threshold=64e-6):
190 def serialize_object(obj, threshold=64e-6):
172 """Serialize an object into a list of sendable buffers.
191 """Serialize an object into a list of sendable buffers.
173
192
174 Parameters
193 Parameters
175 ----------
194 ----------
176
195
177 obj : object
196 obj : object
178 The object to be serialized
197 The object to be serialized
179 threshold : float
198 threshold : float
180 The threshold for not double-pickling the content.
199 The threshold for not double-pickling the content.
181
200
182
201
183 Returns
202 Returns
184 -------
203 -------
185 ('pmd', [bufs]) :
204 ('pmd', [bufs]) :
186 where pmd is the pickled metadata wrapper,
205 where pmd is the pickled metadata wrapper,
187 bufs is a list of data buffers
206 bufs is a list of data buffers
188 """
207 """
189 databuffers = []
208 databuffers = []
190 if isinstance(obj, (list, tuple)):
209 if isinstance(obj, (list, tuple)):
191 clist = canSequence(obj)
210 clist = canSequence(obj)
192 slist = map(serialize, clist)
211 slist = map(serialize, clist)
193 for s in slist:
212 for s in slist:
194 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
213 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
195 databuffers.append(s.getData())
214 databuffers.append(s.getData())
196 s.data = None
215 s.data = None
197 return pickle.dumps(slist,-1), databuffers
216 return pickle.dumps(slist,-1), databuffers
198 elif isinstance(obj, dict):
217 elif isinstance(obj, dict):
199 sobj = {}
218 sobj = {}
200 for k in sorted(obj.iterkeys()):
219 for k in sorted(obj.iterkeys()):
201 s = serialize(can(obj[k]))
220 s = serialize(can(obj[k]))
202 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
221 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
203 databuffers.append(s.getData())
222 databuffers.append(s.getData())
204 s.data = None
223 s.data = None
205 sobj[k] = s
224 sobj[k] = s
206 return pickle.dumps(sobj,-1),databuffers
225 return pickle.dumps(sobj,-1),databuffers
207 else:
226 else:
208 s = serialize(can(obj))
227 s = serialize(can(obj))
209 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
228 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
210 databuffers.append(s.getData())
229 databuffers.append(s.getData())
211 s.data = None
230 s.data = None
212 return pickle.dumps(s,-1),databuffers
231 return pickle.dumps(s,-1),databuffers
213
232
214
233
215 def unserialize_object(bufs):
234 def unserialize_object(bufs):
216 """reconstruct an object serialized by serialize_object from data buffers."""
235 """reconstruct an object serialized by serialize_object from data buffers."""
217 bufs = list(bufs)
236 bufs = list(bufs)
218 sobj = pickle.loads(bufs.pop(0))
237 sobj = pickle.loads(bufs.pop(0))
219 if isinstance(sobj, (list, tuple)):
238 if isinstance(sobj, (list, tuple)):
220 for s in sobj:
239 for s in sobj:
221 if s.data is None:
240 if s.data is None:
222 s.data = bufs.pop(0)
241 s.data = bufs.pop(0)
223 return uncanSequence(map(unserialize, sobj)), bufs
242 return uncanSequence(map(unserialize, sobj)), bufs
224 elif isinstance(sobj, dict):
243 elif isinstance(sobj, dict):
225 newobj = {}
244 newobj = {}
226 for k in sorted(sobj.iterkeys()):
245 for k in sorted(sobj.iterkeys()):
227 s = sobj[k]
246 s = sobj[k]
228 if s.data is None:
247 if s.data is None:
229 s.data = bufs.pop(0)
248 s.data = bufs.pop(0)
230 newobj[k] = uncan(unserialize(s))
249 newobj[k] = uncan(unserialize(s))
231 return newobj, bufs
250 return newobj, bufs
232 else:
251 else:
233 if sobj.data is None:
252 if sobj.data is None:
234 sobj.data = bufs.pop(0)
253 sobj.data = bufs.pop(0)
235 return uncan(unserialize(sobj)), bufs
254 return uncan(unserialize(sobj)), bufs
236
255
237 def pack_apply_message(f, args, kwargs, threshold=64e-6):
256 def pack_apply_message(f, args, kwargs, threshold=64e-6):
238 """pack up a function, args, and kwargs to be sent over the wire
257 """pack up a function, args, and kwargs to be sent over the wire
239 as a series of buffers. Any object whose data is larger than `threshold`
258 as a series of buffers. Any object whose data is larger than `threshold`
240 will not have their data copied (currently only numpy arrays support zero-copy)"""
259 will not have their data copied (currently only numpy arrays support zero-copy)"""
241 msg = [pickle.dumps(can(f),-1)]
260 msg = [pickle.dumps(can(f),-1)]
242 databuffers = [] # for large objects
261 databuffers = [] # for large objects
243 sargs, bufs = serialize_object(args,threshold)
262 sargs, bufs = serialize_object(args,threshold)
244 msg.append(sargs)
263 msg.append(sargs)
245 databuffers.extend(bufs)
264 databuffers.extend(bufs)
246 skwargs, bufs = serialize_object(kwargs,threshold)
265 skwargs, bufs = serialize_object(kwargs,threshold)
247 msg.append(skwargs)
266 msg.append(skwargs)
248 databuffers.extend(bufs)
267 databuffers.extend(bufs)
249 msg.extend(databuffers)
268 msg.extend(databuffers)
250 return msg
269 return msg
251
270
252 def unpack_apply_message(bufs, g=None, copy=True):
271 def unpack_apply_message(bufs, g=None, copy=True):
253 """unpack f,args,kwargs from buffers packed by pack_apply_message()
272 """unpack f,args,kwargs from buffers packed by pack_apply_message()
254 Returns: original f,args,kwargs"""
273 Returns: original f,args,kwargs"""
255 bufs = list(bufs) # allow us to pop
274 bufs = list(bufs) # allow us to pop
256 assert len(bufs) >= 3, "not enough buffers!"
275 assert len(bufs) >= 3, "not enough buffers!"
257 if not copy:
276 if not copy:
258 for i in range(3):
277 for i in range(3):
259 bufs[i] = bufs[i].bytes
278 bufs[i] = bufs[i].bytes
260 cf = pickle.loads(bufs.pop(0))
279 cf = pickle.loads(bufs.pop(0))
261 sargs = list(pickle.loads(bufs.pop(0)))
280 sargs = list(pickle.loads(bufs.pop(0)))
262 skwargs = dict(pickle.loads(bufs.pop(0)))
281 skwargs = dict(pickle.loads(bufs.pop(0)))
263 # print sargs, skwargs
282 # print sargs, skwargs
264 f = uncan(cf, g)
283 f = uncan(cf, g)
265 for sa in sargs:
284 for sa in sargs:
266 if sa.data is None:
285 if sa.data is None:
267 m = bufs.pop(0)
286 m = bufs.pop(0)
268 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
287 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
269 if copy:
288 if copy:
270 sa.data = buffer(m)
289 sa.data = buffer(m)
271 else:
290 else:
272 sa.data = m.buffer
291 sa.data = m.buffer
273 else:
292 else:
274 if copy:
293 if copy:
275 sa.data = m
294 sa.data = m
276 else:
295 else:
277 sa.data = m.bytes
296 sa.data = m.bytes
278
297
279 args = uncanSequence(map(unserialize, sargs), g)
298 args = uncanSequence(map(unserialize, sargs), g)
280 kwargs = {}
299 kwargs = {}
281 for k in sorted(skwargs.iterkeys()):
300 for k in sorted(skwargs.iterkeys()):
282 sa = skwargs[k]
301 sa = skwargs[k]
283 if sa.data is None:
302 if sa.data is None:
284 m = bufs.pop(0)
303 m = bufs.pop(0)
285 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
304 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
286 if copy:
305 if copy:
287 sa.data = buffer(m)
306 sa.data = buffer(m)
288 else:
307 else:
289 sa.data = m.buffer
308 sa.data = m.buffer
290 else:
309 else:
291 if copy:
310 if copy:
292 sa.data = m
311 sa.data = m
293 else:
312 else:
294 sa.data = m.bytes
313 sa.data = m.bytes
295
314
296 kwargs[k] = uncan(unserialize(sa), g)
315 kwargs[k] = uncan(unserialize(sa), g)
297
316
298 return f,args,kwargs
317 return f,args,kwargs
299
318
General Comments 0
You need to be logged in to leave comments. Login now