##// END OF EJS Templates
add timeout for unmet dependencies in task scheduler
MinRK -
Show More
@@ -1,200 +1,205
1 """AsyncResult objects for the client"""
1 """AsyncResult objects for the client"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.external.decorator import decorator
13 from IPython.external.decorator import decorator
14 import error
14 import error
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Classes
17 # Classes
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 @decorator
20 @decorator
21 def check_ready(f, self, *args, **kwargs):
21 def check_ready(f, self, *args, **kwargs):
22 """Call spin() to sync state prior to calling the method."""
22 """Call spin() to sync state prior to calling the method."""
23 self.wait(0)
23 self.wait(0)
24 if not self._ready:
24 if not self._ready:
25 raise error.TimeoutError("result not ready")
25 raise error.TimeoutError("result not ready")
26 return f(self, *args, **kwargs)
26 return f(self, *args, **kwargs)
27
27
28 class AsyncResult(object):
28 class AsyncResult(object):
29 """Class for representing results of non-blocking calls.
29 """Class for representing results of non-blocking calls.
30
30
31 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
31 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
32 """
32 """
33 def __init__(self, client, msg_ids, fname=''):
33 def __init__(self, client, msg_ids, fname=''):
34 self._client = client
34 self._client = client
35 self.msg_ids = msg_ids
35 self.msg_ids = msg_ids
36 self._fname=fname
36 self._fname=fname
37 self._ready = False
37 self._ready = False
38 self._success = None
38 self._success = None
39 self._flatten_result = len(msg_ids) == 1
39 self._single_result = len(msg_ids) == 1
40
40
41 def __repr__(self):
41 def __repr__(self):
42 if self._ready:
42 if self._ready:
43 return "<%s: finished>"%(self.__class__.__name__)
43 return "<%s: finished>"%(self.__class__.__name__)
44 else:
44 else:
45 return "<%s: %s>"%(self.__class__.__name__,self._fname)
45 return "<%s: %s>"%(self.__class__.__name__,self._fname)
46
46
47
47
48 def _reconstruct_result(self, res):
48 def _reconstruct_result(self, res):
49 """
49 """
50 Override me in subclasses for turning a list of results
50 Override me in subclasses for turning a list of results
51 into the expected form.
51 into the expected form.
52 """
52 """
53 if self._flatten_result:
53 if self._single_result:
54 return res[0]
54 return res[0]
55 else:
55 else:
56 return res
56 return res
57
57
58 def get(self, timeout=-1):
58 def get(self, timeout=-1):
59 """Return the result when it arrives.
59 """Return the result when it arrives.
60
60
61 If `timeout` is not ``None`` and the result does not arrive within
61 If `timeout` is not ``None`` and the result does not arrive within
62 `timeout` seconds then ``TimeoutError`` is raised. If the
62 `timeout` seconds then ``TimeoutError`` is raised. If the
63 remote call raised an exception then that exception will be reraised
63 remote call raised an exception then that exception will be reraised
64 by get().
64 by get().
65 """
65 """
66 if not self.ready():
66 if not self.ready():
67 self.wait(timeout)
67 self.wait(timeout)
68
68
69 if self._ready:
69 if self._ready:
70 if self._success:
70 if self._success:
71 return self._result
71 return self._result
72 else:
72 else:
73 raise self._exception
73 raise self._exception
74 else:
74 else:
75 raise error.TimeoutError("Result not ready.")
75 raise error.TimeoutError("Result not ready.")
76
76
77 def ready(self):
77 def ready(self):
78 """Return whether the call has completed."""
78 """Return whether the call has completed."""
79 if not self._ready:
79 if not self._ready:
80 self.wait(0)
80 self.wait(0)
81 return self._ready
81 return self._ready
82
82
83 def wait(self, timeout=-1):
83 def wait(self, timeout=-1):
84 """Wait until the result is available or until `timeout` seconds pass.
84 """Wait until the result is available or until `timeout` seconds pass.
85 """
85 """
86 if self._ready:
86 if self._ready:
87 return
87 return
88 self._ready = self._client.barrier(self.msg_ids, timeout)
88 self._ready = self._client.barrier(self.msg_ids, timeout)
89 if self._ready:
89 if self._ready:
90 try:
90 try:
91 results = map(self._client.results.get, self.msg_ids)
91 results = map(self._client.results.get, self.msg_ids)
92 self._result = results
92 self._result = results
93 if self._single_result:
94 r = results[0]
95 if isinstance(r, Exception):
96 raise r
97 else:
93 results = error.collect_exceptions(results, self._fname)
98 results = error.collect_exceptions(results, self._fname)
94 self._result = self._reconstruct_result(results)
99 self._result = self._reconstruct_result(results)
95 except Exception, e:
100 except Exception, e:
96 self._exception = e
101 self._exception = e
97 self._success = False
102 self._success = False
98 else:
103 else:
99 self._success = True
104 self._success = True
100 finally:
105 finally:
101 self._metadata = map(self._client.metadata.get, self.msg_ids)
106 self._metadata = map(self._client.metadata.get, self.msg_ids)
102
107
103
108
104 def successful(self):
109 def successful(self):
105 """Return whether the call completed without raising an exception.
110 """Return whether the call completed without raising an exception.
106
111
107 Will raise ``AssertionError`` if the result is not ready.
112 Will raise ``AssertionError`` if the result is not ready.
108 """
113 """
109 assert self._ready
114 assert self._ready
110 return self._success
115 return self._success
111
116
112 #----------------------------------------------------------------
117 #----------------------------------------------------------------
113 # Extra methods not in mp.pool.AsyncResult
118 # Extra methods not in mp.pool.AsyncResult
114 #----------------------------------------------------------------
119 #----------------------------------------------------------------
115
120
116 def get_dict(self, timeout=-1):
121 def get_dict(self, timeout=-1):
117 """Get the results as a dict, keyed by engine_id."""
122 """Get the results as a dict, keyed by engine_id."""
118 results = self.get(timeout)
123 results = self.get(timeout)
119 engine_ids = [ md['engine_id'] for md in self._metadata ]
124 engine_ids = [ md['engine_id'] for md in self._metadata ]
120 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
125 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
121 maxcount = bycount.count(bycount[-1])
126 maxcount = bycount.count(bycount[-1])
122 if maxcount > 1:
127 if maxcount > 1:
123 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
128 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
124 maxcount, bycount[-1]))
129 maxcount, bycount[-1]))
125
130
126 return dict(zip(engine_ids,results))
131 return dict(zip(engine_ids,results))
127
132
128 @property
133 @property
129 @check_ready
134 @check_ready
130 def result(self):
135 def result(self):
131 """result property."""
136 """result property."""
132 return self._result
137 return self._result
133
138
134 # abbreviated alias:
139 # abbreviated alias:
135 r = result
140 r = result
136
141
137 @property
142 @property
138 @check_ready
143 @check_ready
139 def metadata(self):
144 def metadata(self):
140 """metadata property."""
145 """metadata property."""
141 if self._flatten_result:
146 if self._single_result:
142 return self._metadata[0]
147 return self._metadata[0]
143 else:
148 else:
144 return self._metadata
149 return self._metadata
145
150
146 @property
151 @property
147 def result_dict(self):
152 def result_dict(self):
148 """result property as a dict."""
153 """result property as a dict."""
149 return self.get_dict(0)
154 return self.get_dict(0)
150
155
151 def __dict__(self):
156 def __dict__(self):
152 return self.get_dict(0)
157 return self.get_dict(0)
153
158
154 #-------------------------------------
159 #-------------------------------------
155 # dict-access
160 # dict-access
156 #-------------------------------------
161 #-------------------------------------
157
162
158 @check_ready
163 @check_ready
159 def __getitem__(self, key):
164 def __getitem__(self, key):
160 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
165 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
161 """
166 """
162 if isinstance(key, int):
167 if isinstance(key, int):
163 return error.collect_exceptions([self._result[key]], self._fname)[0]
168 return error.collect_exceptions([self._result[key]], self._fname)[0]
164 elif isinstance(key, slice):
169 elif isinstance(key, slice):
165 return error.collect_exceptions(self._result[key], self._fname)
170 return error.collect_exceptions(self._result[key], self._fname)
166 elif isinstance(key, basestring):
171 elif isinstance(key, basestring):
167 values = [ md[key] for md in self._metadata ]
172 values = [ md[key] for md in self._metadata ]
168 if self._flatten_result:
173 if self._single_result:
169 return values[0]
174 return values[0]
170 else:
175 else:
171 return values
176 return values
172 else:
177 else:
173 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
178 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
174
179
175 @check_ready
180 @check_ready
176 def __getattr__(self, key):
181 def __getattr__(self, key):
177 """getattr maps to getitem for convenient access to metadata."""
182 """getattr maps to getitem for convenient access to metadata."""
178 if key not in self._metadata[0].keys():
183 if key not in self._metadata[0].keys():
179 raise AttributeError("%r object has no attribute %r"%(
184 raise AttributeError("%r object has no attribute %r"%(
180 self.__class__.__name__, key))
185 self.__class__.__name__, key))
181 return self.__getitem__(key)
186 return self.__getitem__(key)
182
187
183
188
184 class AsyncMapResult(AsyncResult):
189 class AsyncMapResult(AsyncResult):
185 """Class for representing results of non-blocking gathers.
190 """Class for representing results of non-blocking gathers.
186
191
187 This will properly reconstruct the gather.
192 This will properly reconstruct the gather.
188 """
193 """
189
194
190 def __init__(self, client, msg_ids, mapObject, fname=''):
195 def __init__(self, client, msg_ids, mapObject, fname=''):
191 AsyncResult.__init__(self, client, msg_ids, fname=fname)
196 AsyncResult.__init__(self, client, msg_ids, fname=fname)
192 self._mapObject = mapObject
197 self._mapObject = mapObject
193 self._flatten_result = False
198 self._single_result = False
194
199
195 def _reconstruct_result(self, res):
200 def _reconstruct_result(self, res):
196 """Perform the gather on the actual results."""
201 """Perform the gather on the actual results."""
197 return self._mapObject.joinPartitions(res)
202 return self._mapObject.joinPartitions(res)
198
203
199
204
200 __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file
205 __all__ = ['AsyncResult', 'AsyncMapResult']
@@ -1,1206 +1,1216
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import time
14 import time
15 from getpass import getpass
15 from getpass import getpass
16 from pprint import pprint
16 from pprint import pprint
17 from datetime import datetime
17 from datetime import datetime
18
18
19 import zmq
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
20 from zmq.eventloop import ioloop, zmqstream
21
21
22 from IPython.external.decorator import decorator
22 from IPython.external.decorator import decorator
23 from IPython.zmq import tunnel
23 from IPython.zmq import tunnel
24
24
25 import streamsession as ss
25 import streamsession as ss
26 # from remotenamespace import RemoteNamespace
26 # from remotenamespace import RemoteNamespace
27 from view import DirectView, LoadBalancedView
27 from view import DirectView, LoadBalancedView
28 from dependency import Dependency, depend, require
28 from dependency import Dependency, depend, require
29 import error
29 import error
30 import map as Map
30 import map as Map
31 from asyncresult import AsyncResult, AsyncMapResult
31 from asyncresult import AsyncResult, AsyncMapResult
32 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
32 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
33 from util import ReverseDict
33 from util import ReverseDict
34
34
35 #--------------------------------------------------------------------------
35 #--------------------------------------------------------------------------
36 # helpers for implementing old MEC API via client.apply
36 # helpers for implementing old MEC API via client.apply
37 #--------------------------------------------------------------------------
37 #--------------------------------------------------------------------------
38
38
39 def _push(ns):
39 def _push(ns):
40 """helper method for implementing `client.push` via `client.apply`"""
40 """helper method for implementing `client.push` via `client.apply`"""
41 globals().update(ns)
41 globals().update(ns)
42
42
43 def _pull(keys):
43 def _pull(keys):
44 """helper method for implementing `client.pull` via `client.apply`"""
44 """helper method for implementing `client.pull` via `client.apply`"""
45 g = globals()
45 g = globals()
46 if isinstance(keys, (list,tuple, set)):
46 if isinstance(keys, (list,tuple, set)):
47 for key in keys:
47 for key in keys:
48 if not g.has_key(key):
48 if not g.has_key(key):
49 raise NameError("name '%s' is not defined"%key)
49 raise NameError("name '%s' is not defined"%key)
50 return map(g.get, keys)
50 return map(g.get, keys)
51 else:
51 else:
52 if not g.has_key(keys):
52 if not g.has_key(keys):
53 raise NameError("name '%s' is not defined"%keys)
53 raise NameError("name '%s' is not defined"%keys)
54 return g.get(keys)
54 return g.get(keys)
55
55
56 def _clear():
56 def _clear():
57 """helper method for implementing `client.clear` via `client.apply`"""
57 """helper method for implementing `client.clear` via `client.apply`"""
58 globals().clear()
58 globals().clear()
59
59
60 def _execute(code):
60 def _execute(code):
61 """helper method for implementing `client.execute` via `client.apply`"""
61 """helper method for implementing `client.execute` via `client.apply`"""
62 exec code in globals()
62 exec code in globals()
63
63
64
64
65 #--------------------------------------------------------------------------
65 #--------------------------------------------------------------------------
66 # Decorators for Client methods
66 # Decorators for Client methods
67 #--------------------------------------------------------------------------
67 #--------------------------------------------------------------------------
68
68
69 @decorator
69 @decorator
70 def spinfirst(f, self, *args, **kwargs):
70 def spinfirst(f, self, *args, **kwargs):
71 """Call spin() to sync state prior to calling the method."""
71 """Call spin() to sync state prior to calling the method."""
72 self.spin()
72 self.spin()
73 return f(self, *args, **kwargs)
73 return f(self, *args, **kwargs)
74
74
75 @decorator
75 @decorator
76 def defaultblock(f, self, *args, **kwargs):
76 def defaultblock(f, self, *args, **kwargs):
77 """Default to self.block; preserve self.block."""
77 """Default to self.block; preserve self.block."""
78 block = kwargs.get('block',None)
78 block = kwargs.get('block',None)
79 block = self.block if block is None else block
79 block = self.block if block is None else block
80 saveblock = self.block
80 saveblock = self.block
81 self.block = block
81 self.block = block
82 try:
82 try:
83 ret = f(self, *args, **kwargs)
83 ret = f(self, *args, **kwargs)
84 finally:
84 finally:
85 self.block = saveblock
85 self.block = saveblock
86 return ret
86 return ret
87
87
88
88
89 #--------------------------------------------------------------------------
89 #--------------------------------------------------------------------------
90 # Classes
90 # Classes
91 #--------------------------------------------------------------------------
91 #--------------------------------------------------------------------------
92
92
93 class Metadata(dict):
93 class Metadata(dict):
94 """Subclass of dict for initializing metadata values.
94 """Subclass of dict for initializing metadata values.
95
95
96 Attribute access works on keys.
96 Attribute access works on keys.
97
97
98 These objects have a strict set of keys - errors will raise if you try
98 These objects have a strict set of keys - errors will raise if you try
99 to add new keys.
99 to add new keys.
100 """
100 """
101 def __init__(self, *args, **kwargs):
101 def __init__(self, *args, **kwargs):
102 dict.__init__(self)
102 dict.__init__(self)
103 md = {'msg_id' : None,
103 md = {'msg_id' : None,
104 'submitted' : None,
104 'submitted' : None,
105 'started' : None,
105 'started' : None,
106 'completed' : None,
106 'completed' : None,
107 'received' : None,
107 'received' : None,
108 'engine_uuid' : None,
108 'engine_uuid' : None,
109 'engine_id' : None,
109 'engine_id' : None,
110 'follow' : None,
110 'follow' : None,
111 'after' : None,
111 'after' : None,
112 'status' : None,
112 'status' : None,
113
113
114 'pyin' : None,
114 'pyin' : None,
115 'pyout' : None,
115 'pyout' : None,
116 'pyerr' : None,
116 'pyerr' : None,
117 'stdout' : '',
117 'stdout' : '',
118 'stderr' : '',
118 'stderr' : '',
119 }
119 }
120 self.update(md)
120 self.update(md)
121 self.update(dict(*args, **kwargs))
121 self.update(dict(*args, **kwargs))
122
122
123 def __getattr__(self, key):
123 def __getattr__(self, key):
124 """getattr aliased to getitem"""
124 """getattr aliased to getitem"""
125 if key in self.iterkeys():
125 if key in self.iterkeys():
126 return self[key]
126 return self[key]
127 else:
127 else:
128 raise AttributeError(key)
128 raise AttributeError(key)
129
129
130 def __setattr__(self, key, value):
130 def __setattr__(self, key, value):
131 """setattr aliased to setitem, with strict"""
131 """setattr aliased to setitem, with strict"""
132 if key in self.iterkeys():
132 if key in self.iterkeys():
133 self[key] = value
133 self[key] = value
134 else:
134 else:
135 raise AttributeError(key)
135 raise AttributeError(key)
136
136
137 def __setitem__(self, key, value):
137 def __setitem__(self, key, value):
138 """strict static key enforcement"""
138 """strict static key enforcement"""
139 if key in self.iterkeys():
139 if key in self.iterkeys():
140 dict.__setitem__(self, key, value)
140 dict.__setitem__(self, key, value)
141 else:
141 else:
142 raise KeyError(key)
142 raise KeyError(key)
143
143
144
144
145 class Client(object):
145 class Client(object):
146 """A semi-synchronous client to the IPython ZMQ controller
146 """A semi-synchronous client to the IPython ZMQ controller
147
147
148 Parameters
148 Parameters
149 ----------
149 ----------
150
150
151 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
151 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
152 The address of the controller's registration socket.
152 The address of the controller's registration socket.
153 [Default: 'tcp://127.0.0.1:10101']
153 [Default: 'tcp://127.0.0.1:10101']
154 context : zmq.Context
154 context : zmq.Context
155 Pass an existing zmq.Context instance, otherwise the client will create its own
155 Pass an existing zmq.Context instance, otherwise the client will create its own
156 username : bytes
156 username : bytes
157 set username to be passed to the Session object
157 set username to be passed to the Session object
158 debug : bool
158 debug : bool
159 flag for lots of message printing for debug purposes
159 flag for lots of message printing for debug purposes
160
160
161 #-------------- ssh related args ----------------
161 #-------------- ssh related args ----------------
162 # These are args for configuring the ssh tunnel to be used
162 # These are args for configuring the ssh tunnel to be used
163 # credentials are used to forward connections over ssh to the Controller
163 # credentials are used to forward connections over ssh to the Controller
164 # Note that the ip given in `addr` needs to be relative to sshserver
164 # Note that the ip given in `addr` needs to be relative to sshserver
165 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
165 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
166 # and set sshserver as the same machine the Controller is on. However,
166 # and set sshserver as the same machine the Controller is on. However,
167 # the only requirement is that sshserver is able to see the Controller
167 # the only requirement is that sshserver is able to see the Controller
168 # (i.e. is within the same trusted network).
168 # (i.e. is within the same trusted network).
169
169
170 sshserver : str
170 sshserver : str
171 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
171 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
172 If keyfile or password is specified, and this is not, it will default to
172 If keyfile or password is specified, and this is not, it will default to
173 the ip given in addr.
173 the ip given in addr.
174 sshkey : str; path to public ssh key file
174 sshkey : str; path to public ssh key file
175 This specifies a key to be used in ssh login, default None.
175 This specifies a key to be used in ssh login, default None.
176 Regular default ssh keys will be used without specifying this argument.
176 Regular default ssh keys will be used without specifying this argument.
177 password : str;
177 password : str;
178 Your ssh password to sshserver. Note that if this is left None,
178 Your ssh password to sshserver. Note that if this is left None,
179 you will be prompted for it if passwordless key based login is unavailable.
179 you will be prompted for it if passwordless key based login is unavailable.
180
180
181 #------- exec authentication args -------
181 #------- exec authentication args -------
182 # If even localhost is untrusted, you can have some protection against
182 # If even localhost is untrusted, you can have some protection against
183 # unauthorized execution by using a key. Messages are still sent
183 # unauthorized execution by using a key. Messages are still sent
184 # as cleartext, so if someone can snoop your loopback traffic this will
184 # as cleartext, so if someone can snoop your loopback traffic this will
185 # not help anything.
185 # not help anything.
186
186
187 exec_key : str
187 exec_key : str
188 an authentication key or file containing a key
188 an authentication key or file containing a key
189 default: None
189 default: None
190
190
191
191
192 Attributes
192 Attributes
193 ----------
193 ----------
194 ids : set of int engine IDs
194 ids : set of int engine IDs
195 requesting the ids attribute always synchronizes
195 requesting the ids attribute always synchronizes
196 the registration state. To request ids without synchronization,
196 the registration state. To request ids without synchronization,
197 use semi-private _ids attributes.
197 use semi-private _ids attributes.
198
198
199 history : list of msg_ids
199 history : list of msg_ids
200 a list of msg_ids, keeping track of all the execution
200 a list of msg_ids, keeping track of all the execution
201 messages you have submitted in order.
201 messages you have submitted in order.
202
202
203 outstanding : set of msg_ids
203 outstanding : set of msg_ids
204 a set of msg_ids that have been submitted, but whose
204 a set of msg_ids that have been submitted, but whose
205 results have not yet been received.
205 results have not yet been received.
206
206
207 results : dict
207 results : dict
208 a dict of all our results, keyed by msg_id
208 a dict of all our results, keyed by msg_id
209
209
210 block : bool
210 block : bool
211 determines default behavior when block not specified
211 determines default behavior when block not specified
212 in execution methods
212 in execution methods
213
213
214 Methods
214 Methods
215 -------
215 -------
216 spin : flushes incoming results and registration state changes
216 spin : flushes incoming results and registration state changes
217 control methods spin, and requesting `ids` also ensures up to date
217 control methods spin, and requesting `ids` also ensures up to date
218
218
219 barrier : wait on one or more msg_ids
219 barrier : wait on one or more msg_ids
220
220
221 execution methods: apply/apply_bound/apply_to/apply_bound
221 execution methods: apply/apply_bound/apply_to/apply_bound
222 legacy: execute, run
222 legacy: execute, run
223
223
224 query methods: queue_status, get_result, purge
224 query methods: queue_status, get_result, purge
225
225
226 control methods: abort, kill
226 control methods: abort, kill
227
227
228 """
228 """
229
229
230
230
231 _connected=False
231 _connected=False
232 _ssh=False
232 _ssh=False
233 _engines=None
233 _engines=None
234 _addr='tcp://127.0.0.1:10101'
234 _addr='tcp://127.0.0.1:10101'
235 _registration_socket=None
235 _registration_socket=None
236 _query_socket=None
236 _query_socket=None
237 _control_socket=None
237 _control_socket=None
238 _iopub_socket=None
238 _iopub_socket=None
239 _notification_socket=None
239 _notification_socket=None
240 _mux_socket=None
240 _mux_socket=None
241 _task_socket=None
241 _task_socket=None
242 block = False
242 block = False
243 outstanding=None
243 outstanding=None
244 results = None
244 results = None
245 history = None
245 history = None
246 debug = False
246 debug = False
247 targets = None
247 targets = None
248
248
249 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
249 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
250 sshserver=None, sshkey=None, password=None, paramiko=None,
250 sshserver=None, sshkey=None, password=None, paramiko=None,
251 exec_key=None,):
251 exec_key=None,):
252 if context is None:
252 if context is None:
253 context = zmq.Context()
253 context = zmq.Context()
254 self.context = context
254 self.context = context
255 self.targets = 'all'
255 self.targets = 'all'
256 self._addr = addr
256 self._addr = addr
257 self._ssh = bool(sshserver or sshkey or password)
257 self._ssh = bool(sshserver or sshkey or password)
258 if self._ssh and sshserver is None:
258 if self._ssh and sshserver is None:
259 # default to the same
259 # default to the same
260 sshserver = addr.split('://')[1].split(':')[0]
260 sshserver = addr.split('://')[1].split(':')[0]
261 if self._ssh and password is None:
261 if self._ssh and password is None:
262 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
262 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
263 password=False
263 password=False
264 else:
264 else:
265 password = getpass("SSH Password for %s: "%sshserver)
265 password = getpass("SSH Password for %s: "%sshserver)
266 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
266 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
267
267
268 if exec_key is not None and os.path.isfile(exec_key):
268 if exec_key is not None and os.path.isfile(exec_key):
269 arg = 'keyfile'
269 arg = 'keyfile'
270 else:
270 else:
271 arg = 'key'
271 arg = 'key'
272 key_arg = {arg:exec_key}
272 key_arg = {arg:exec_key}
273 if username is None:
273 if username is None:
274 self.session = ss.StreamSession(**key_arg)
274 self.session = ss.StreamSession(**key_arg)
275 else:
275 else:
276 self.session = ss.StreamSession(username, **key_arg)
276 self.session = ss.StreamSession(username, **key_arg)
277 self._registration_socket = self.context.socket(zmq.XREQ)
277 self._registration_socket = self.context.socket(zmq.XREQ)
278 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
278 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
279 if self._ssh:
279 if self._ssh:
280 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
280 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
281 else:
281 else:
282 self._registration_socket.connect(addr)
282 self._registration_socket.connect(addr)
283 self._engines = ReverseDict()
283 self._engines = ReverseDict()
284 self._ids = set()
284 self._ids = set()
285 self.outstanding=set()
285 self.outstanding=set()
286 self.results = {}
286 self.results = {}
287 self.metadata = {}
287 self.metadata = {}
288 self.history = []
288 self.history = []
289 self.debug = debug
289 self.debug = debug
290 self.session.debug = debug
290 self.session.debug = debug
291
291
292 self._notification_handlers = {'registration_notification' : self._register_engine,
292 self._notification_handlers = {'registration_notification' : self._register_engine,
293 'unregistration_notification' : self._unregister_engine,
293 'unregistration_notification' : self._unregister_engine,
294 }
294 }
295 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
295 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
296 'apply_reply' : self._handle_apply_reply}
296 'apply_reply' : self._handle_apply_reply}
297 self._connect(sshserver, ssh_kwargs)
297 self._connect(sshserver, ssh_kwargs)
298
298
299
299
300 @property
300 @property
301 def ids(self):
301 def ids(self):
302 """Always up to date ids property."""
302 """Always up to date ids property."""
303 self._flush_notifications()
303 self._flush_notifications()
304 return self._ids
304 return self._ids
305
305
306 def _update_engines(self, engines):
306 def _update_engines(self, engines):
307 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
307 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
308 for k,v in engines.iteritems():
308 for k,v in engines.iteritems():
309 eid = int(k)
309 eid = int(k)
310 self._engines[eid] = bytes(v) # force not unicode
310 self._engines[eid] = bytes(v) # force not unicode
311 self._ids.add(eid)
311 self._ids.add(eid)
312
312
313 def _build_targets(self, targets):
313 def _build_targets(self, targets):
314 """Turn valid target IDs or 'all' into two lists:
314 """Turn valid target IDs or 'all' into two lists:
315 (int_ids, uuids).
315 (int_ids, uuids).
316 """
316 """
317 if targets is None:
317 if targets is None:
318 targets = self._ids
318 targets = self._ids
319 elif isinstance(targets, str):
319 elif isinstance(targets, str):
320 if targets.lower() == 'all':
320 if targets.lower() == 'all':
321 targets = self._ids
321 targets = self._ids
322 else:
322 else:
323 raise TypeError("%r not valid str target, must be 'all'"%(targets))
323 raise TypeError("%r not valid str target, must be 'all'"%(targets))
324 elif isinstance(targets, int):
324 elif isinstance(targets, int):
325 targets = [targets]
325 targets = [targets]
326 return [self._engines[t] for t in targets], list(targets)
326 return [self._engines[t] for t in targets], list(targets)
327
327
328 def _connect(self, sshserver, ssh_kwargs):
328 def _connect(self, sshserver, ssh_kwargs):
329 """setup all our socket connections to the controller. This is called from
329 """setup all our socket connections to the controller. This is called from
330 __init__."""
330 __init__."""
331 if self._connected:
331 if self._connected:
332 return
332 return
333 self._connected=True
333 self._connected=True
334
334
335 def connect_socket(s, addr):
335 def connect_socket(s, addr):
336 if self._ssh:
336 if self._ssh:
337 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
337 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
338 else:
338 else:
339 return s.connect(addr)
339 return s.connect(addr)
340
340
341 self.session.send(self._registration_socket, 'connection_request')
341 self.session.send(self._registration_socket, 'connection_request')
342 idents,msg = self.session.recv(self._registration_socket,mode=0)
342 idents,msg = self.session.recv(self._registration_socket,mode=0)
343 if self.debug:
343 if self.debug:
344 pprint(msg)
344 pprint(msg)
345 msg = ss.Message(msg)
345 msg = ss.Message(msg)
346 content = msg.content
346 content = msg.content
347 if content.status == 'ok':
347 if content.status == 'ok':
348 if content.mux:
348 if content.mux:
349 self._mux_socket = self.context.socket(zmq.PAIR)
349 self._mux_socket = self.context.socket(zmq.PAIR)
350 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
350 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
351 connect_socket(self._mux_socket, content.mux)
351 connect_socket(self._mux_socket, content.mux)
352 if content.task:
352 if content.task:
353 self._task_socket = self.context.socket(zmq.PAIR)
353 self._task_socket = self.context.socket(zmq.PAIR)
354 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
354 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
355 connect_socket(self._task_socket, content.task)
355 connect_socket(self._task_socket, content.task)
356 if content.notification:
356 if content.notification:
357 self._notification_socket = self.context.socket(zmq.SUB)
357 self._notification_socket = self.context.socket(zmq.SUB)
358 connect_socket(self._notification_socket, content.notification)
358 connect_socket(self._notification_socket, content.notification)
359 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
359 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
360 if content.query:
360 if content.query:
361 self._query_socket = self.context.socket(zmq.PAIR)
361 self._query_socket = self.context.socket(zmq.PAIR)
362 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
362 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
363 connect_socket(self._query_socket, content.query)
363 connect_socket(self._query_socket, content.query)
364 if content.control:
364 if content.control:
365 self._control_socket = self.context.socket(zmq.PAIR)
365 self._control_socket = self.context.socket(zmq.PAIR)
366 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
366 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
367 connect_socket(self._control_socket, content.control)
367 connect_socket(self._control_socket, content.control)
368 if content.iopub:
368 if content.iopub:
369 self._iopub_socket = self.context.socket(zmq.SUB)
369 self._iopub_socket = self.context.socket(zmq.SUB)
370 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
370 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
371 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
371 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
372 connect_socket(self._iopub_socket, content.iopub)
372 connect_socket(self._iopub_socket, content.iopub)
373 self._update_engines(dict(content.engines))
373 self._update_engines(dict(content.engines))
374
374
375 else:
375 else:
376 self._connected = False
376 self._connected = False
377 raise Exception("Failed to connect!")
377 raise Exception("Failed to connect!")
378
378
379 #--------------------------------------------------------------------------
379 #--------------------------------------------------------------------------
380 # handlers and callbacks for incoming messages
380 # handlers and callbacks for incoming messages
381 #--------------------------------------------------------------------------
381 #--------------------------------------------------------------------------
382
382
383 def _register_engine(self, msg):
383 def _register_engine(self, msg):
384 """Register a new engine, and update our connection info."""
384 """Register a new engine, and update our connection info."""
385 content = msg['content']
385 content = msg['content']
386 eid = content['id']
386 eid = content['id']
387 d = {eid : content['queue']}
387 d = {eid : content['queue']}
388 self._update_engines(d)
388 self._update_engines(d)
389 self._ids.add(int(eid))
389 self._ids.add(int(eid))
390
390
391 def _unregister_engine(self, msg):
391 def _unregister_engine(self, msg):
392 """Unregister an engine that has died."""
392 """Unregister an engine that has died."""
393 content = msg['content']
393 content = msg['content']
394 eid = int(content['id'])
394 eid = int(content['id'])
395 if eid in self._ids:
395 if eid in self._ids:
396 self._ids.remove(eid)
396 self._ids.remove(eid)
397 self._engines.pop(eid)
397 self._engines.pop(eid)
398
398
399 def _extract_metadata(self, header, parent, content):
399 def _extract_metadata(self, header, parent, content):
400 md = {'msg_id' : parent['msg_id'],
400 md = {'msg_id' : parent['msg_id'],
401 'received' : datetime.now(),
401 'received' : datetime.now(),
402 'engine_uuid' : header.get('engine', None),
402 'engine_uuid' : header.get('engine', None),
403 'follow' : parent['follow'],
403 'follow' : parent['follow'],
404 'after' : parent['after'],
404 'after' : parent['after'],
405 'status' : content['status'],
405 'status' : content['status'],
406 }
406 }
407
407
408 if md['engine_uuid'] is not None:
408 if md['engine_uuid'] is not None:
409 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
409 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
410
410
411 if 'date' in parent:
411 if 'date' in parent:
412 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
412 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
413 if 'started' in header:
413 if 'started' in header:
414 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
414 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
415 if 'date' in header:
415 if 'date' in header:
416 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
416 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
417 return md
417 return md
418
418
419 def _handle_execute_reply(self, msg):
419 def _handle_execute_reply(self, msg):
420 """Save the reply to an execute_request into our results.
420 """Save the reply to an execute_request into our results.
421
421
422 execute messages are never actually used. apply is used instead.
422 execute messages are never actually used. apply is used instead.
423 """
423 """
424
424
425 parent = msg['parent_header']
425 parent = msg['parent_header']
426 msg_id = parent['msg_id']
426 msg_id = parent['msg_id']
427 if msg_id not in self.outstanding:
427 if msg_id not in self.outstanding:
428 if msg_id in self.history:
428 if msg_id in self.history:
429 print ("got stale result: %s"%msg_id)
429 print ("got stale result: %s"%msg_id)
430 else:
430 else:
431 print ("got unknown result: %s"%msg_id)
431 print ("got unknown result: %s"%msg_id)
432 else:
432 else:
433 self.outstanding.remove(msg_id)
433 self.outstanding.remove(msg_id)
434 self.results[msg_id] = ss.unwrap_exception(msg['content'])
434 self.results[msg_id] = ss.unwrap_exception(msg['content'])
435
435
436 def _handle_apply_reply(self, msg):
436 def _handle_apply_reply(self, msg):
437 """Save the reply to an apply_request into our results."""
437 """Save the reply to an apply_request into our results."""
438 parent = msg['parent_header']
438 parent = msg['parent_header']
439 msg_id = parent['msg_id']
439 msg_id = parent['msg_id']
440 if msg_id not in self.outstanding:
440 if msg_id not in self.outstanding:
441 if msg_id in self.history:
441 if msg_id in self.history:
442 print ("got stale result: %s"%msg_id)
442 print ("got stale result: %s"%msg_id)
443 print self.results[msg_id]
443 print self.results[msg_id]
444 print msg
444 print msg
445 else:
445 else:
446 print ("got unknown result: %s"%msg_id)
446 print ("got unknown result: %s"%msg_id)
447 else:
447 else:
448 self.outstanding.remove(msg_id)
448 self.outstanding.remove(msg_id)
449 content = msg['content']
449 content = msg['content']
450 header = msg['header']
450 header = msg['header']
451
451
452 # construct metadata:
452 # construct metadata:
453 md = self.metadata.setdefault(msg_id, Metadata())
453 md = self.metadata.setdefault(msg_id, Metadata())
454 md.update(self._extract_metadata(header, parent, content))
454 md.update(self._extract_metadata(header, parent, content))
455 self.metadata[msg_id] = md
455 self.metadata[msg_id] = md
456
456
457 # construct result:
457 # construct result:
458 if content['status'] == 'ok':
458 if content['status'] == 'ok':
459 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
459 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
460 elif content['status'] == 'aborted':
460 elif content['status'] == 'aborted':
461 self.results[msg_id] = error.AbortedTask(msg_id)
461 self.results[msg_id] = error.AbortedTask(msg_id)
462 elif content['status'] == 'resubmitted':
462 elif content['status'] == 'resubmitted':
463 # TODO: handle resubmission
463 # TODO: handle resubmission
464 pass
464 pass
465 else:
465 else:
466 e = ss.unwrap_exception(content)
466 e = ss.unwrap_exception(content)
467 if e.engine_info:
467 if e.engine_info:
468 e_uuid = e.engine_info['engineid']
468 e_uuid = e.engine_info['engineid']
469 eid = self._engines[e_uuid]
469 eid = self._engines[e_uuid]
470 e.engine_info['engineid'] = eid
470 e.engine_info['engineid'] = eid
471 self.results[msg_id] = e
471 self.results[msg_id] = e
472
472
473 def _flush_notifications(self):
473 def _flush_notifications(self):
474 """Flush notifications of engine registrations waiting
474 """Flush notifications of engine registrations waiting
475 in ZMQ queue."""
475 in ZMQ queue."""
476 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
476 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
477 while msg is not None:
477 while msg is not None:
478 if self.debug:
478 if self.debug:
479 pprint(msg)
479 pprint(msg)
480 msg = msg[-1]
480 msg = msg[-1]
481 msg_type = msg['msg_type']
481 msg_type = msg['msg_type']
482 handler = self._notification_handlers.get(msg_type, None)
482 handler = self._notification_handlers.get(msg_type, None)
483 if handler is None:
483 if handler is None:
484 raise Exception("Unhandled message type: %s"%msg.msg_type)
484 raise Exception("Unhandled message type: %s"%msg.msg_type)
485 else:
485 else:
486 handler(msg)
486 handler(msg)
487 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
487 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
488
488
489 def _flush_results(self, sock):
489 def _flush_results(self, sock):
490 """Flush task or queue results waiting in ZMQ queue."""
490 """Flush task or queue results waiting in ZMQ queue."""
491 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
491 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
492 while msg is not None:
492 while msg is not None:
493 if self.debug:
493 if self.debug:
494 pprint(msg)
494 pprint(msg)
495 msg = msg[-1]
495 msg = msg[-1]
496 msg_type = msg['msg_type']
496 msg_type = msg['msg_type']
497 handler = self._queue_handlers.get(msg_type, None)
497 handler = self._queue_handlers.get(msg_type, None)
498 if handler is None:
498 if handler is None:
499 raise Exception("Unhandled message type: %s"%msg.msg_type)
499 raise Exception("Unhandled message type: %s"%msg.msg_type)
500 else:
500 else:
501 handler(msg)
501 handler(msg)
502 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
502 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
503
503
504 def _flush_control(self, sock):
504 def _flush_control(self, sock):
505 """Flush replies from the control channel waiting
505 """Flush replies from the control channel waiting
506 in the ZMQ queue.
506 in the ZMQ queue.
507
507
508 Currently: ignore them."""
508 Currently: ignore them."""
509 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
509 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
510 while msg is not None:
510 while msg is not None:
511 if self.debug:
511 if self.debug:
512 pprint(msg)
512 pprint(msg)
513 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
513 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
514
514
515 def _flush_iopub(self, sock):
515 def _flush_iopub(self, sock):
516 """Flush replies from the iopub channel waiting
516 """Flush replies from the iopub channel waiting
517 in the ZMQ queue.
517 in the ZMQ queue.
518 """
518 """
519 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
519 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
520 while msg is not None:
520 while msg is not None:
521 if self.debug:
521 if self.debug:
522 pprint(msg)
522 pprint(msg)
523 msg = msg[-1]
523 msg = msg[-1]
524 parent = msg['parent_header']
524 parent = msg['parent_header']
525 msg_id = parent['msg_id']
525 msg_id = parent['msg_id']
526 content = msg['content']
526 content = msg['content']
527 header = msg['header']
527 header = msg['header']
528 msg_type = msg['msg_type']
528 msg_type = msg['msg_type']
529
529
530 # init metadata:
530 # init metadata:
531 md = self.metadata.setdefault(msg_id, Metadata())
531 md = self.metadata.setdefault(msg_id, Metadata())
532
532
533 if msg_type == 'stream':
533 if msg_type == 'stream':
534 name = content['name']
534 name = content['name']
535 s = md[name] or ''
535 s = md[name] or ''
536 md[name] = s + content['data']
536 md[name] = s + content['data']
537 elif msg_type == 'pyerr':
537 elif msg_type == 'pyerr':
538 md.update({'pyerr' : ss.unwrap_exception(content)})
538 md.update({'pyerr' : ss.unwrap_exception(content)})
539 else:
539 else:
540 md.update({msg_type : content['data']})
540 md.update({msg_type : content['data']})
541
541
542 self.metadata[msg_id] = md
542 self.metadata[msg_id] = md
543
543
544 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
544 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
545
545
546 #--------------------------------------------------------------------------
546 #--------------------------------------------------------------------------
547 # getitem
547 # getitem
548 #--------------------------------------------------------------------------
548 #--------------------------------------------------------------------------
549
549
550 def __getitem__(self, key):
550 def __getitem__(self, key):
551 """Dict access returns DirectView multiplexer objects or,
551 """Dict access returns DirectView multiplexer objects or,
552 if key is None, a LoadBalancedView."""
552 if key is None, a LoadBalancedView."""
553 if key is None:
553 if key is None:
554 return LoadBalancedView(self)
554 return LoadBalancedView(self)
555 if isinstance(key, int):
555 if isinstance(key, int):
556 if key not in self.ids:
556 if key not in self.ids:
557 raise IndexError("No such engine: %i"%key)
557 raise IndexError("No such engine: %i"%key)
558 return DirectView(self, key)
558 return DirectView(self, key)
559
559
560 if isinstance(key, slice):
560 if isinstance(key, slice):
561 indices = range(len(self.ids))[key]
561 indices = range(len(self.ids))[key]
562 ids = sorted(self._ids)
562 ids = sorted(self._ids)
563 key = [ ids[i] for i in indices ]
563 key = [ ids[i] for i in indices ]
564 # newkeys = sorted(self._ids)[thekeys[k]]
564 # newkeys = sorted(self._ids)[thekeys[k]]
565
565
566 if isinstance(key, (tuple, list, xrange)):
566 if isinstance(key, (tuple, list, xrange)):
567 _,targets = self._build_targets(list(key))
567 _,targets = self._build_targets(list(key))
568 return DirectView(self, targets)
568 return DirectView(self, targets)
569 else:
569 else:
570 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
570 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
571
571
572 #--------------------------------------------------------------------------
572 #--------------------------------------------------------------------------
573 # Begin public methods
573 # Begin public methods
574 #--------------------------------------------------------------------------
574 #--------------------------------------------------------------------------
575
575
576 @property
576 @property
577 def remote(self):
577 def remote(self):
578 """property for convenient RemoteFunction generation.
578 """property for convenient RemoteFunction generation.
579
579
580 >>> @client.remote
580 >>> @client.remote
581 ... def f():
581 ... def f():
582 import os
582 import os
583 print (os.getpid())
583 print (os.getpid())
584 """
584 """
585 return remote(self, block=self.block)
585 return remote(self, block=self.block)
586
586
587 def spin(self):
587 def spin(self):
588 """Flush any registration notifications and execution results
588 """Flush any registration notifications and execution results
589 waiting in the ZMQ queue.
589 waiting in the ZMQ queue.
590 """
590 """
591 if self._notification_socket:
591 if self._notification_socket:
592 self._flush_notifications()
592 self._flush_notifications()
593 if self._mux_socket:
593 if self._mux_socket:
594 self._flush_results(self._mux_socket)
594 self._flush_results(self._mux_socket)
595 if self._task_socket:
595 if self._task_socket:
596 self._flush_results(self._task_socket)
596 self._flush_results(self._task_socket)
597 if self._control_socket:
597 if self._control_socket:
598 self._flush_control(self._control_socket)
598 self._flush_control(self._control_socket)
599 if self._iopub_socket:
599 if self._iopub_socket:
600 self._flush_iopub(self._iopub_socket)
600 self._flush_iopub(self._iopub_socket)
601
601
602 def barrier(self, msg_ids=None, timeout=-1):
602 def barrier(self, msg_ids=None, timeout=-1):
603 """waits on one or more `msg_ids`, for up to `timeout` seconds.
603 """waits on one or more `msg_ids`, for up to `timeout` seconds.
604
604
605 Parameters
605 Parameters
606 ----------
606 ----------
607 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
607 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
608 ints are indices to self.history
608 ints are indices to self.history
609 strs are msg_ids
609 strs are msg_ids
610 default: wait on all outstanding messages
610 default: wait on all outstanding messages
611 timeout : float
611 timeout : float
612 a time in seconds, after which to give up.
612 a time in seconds, after which to give up.
613 default is -1, which means no timeout
613 default is -1, which means no timeout
614
614
615 Returns
615 Returns
616 -------
616 -------
617 True : when all msg_ids are done
617 True : when all msg_ids are done
618 False : timeout reached, some msg_ids still outstanding
618 False : timeout reached, some msg_ids still outstanding
619 """
619 """
620 tic = time.time()
620 tic = time.time()
621 if msg_ids is None:
621 if msg_ids is None:
622 theids = self.outstanding
622 theids = self.outstanding
623 else:
623 else:
624 if isinstance(msg_ids, (int, str, AsyncResult)):
624 if isinstance(msg_ids, (int, str, AsyncResult)):
625 msg_ids = [msg_ids]
625 msg_ids = [msg_ids]
626 theids = set()
626 theids = set()
627 for msg_id in msg_ids:
627 for msg_id in msg_ids:
628 if isinstance(msg_id, int):
628 if isinstance(msg_id, int):
629 msg_id = self.history[msg_id]
629 msg_id = self.history[msg_id]
630 elif isinstance(msg_id, AsyncResult):
630 elif isinstance(msg_id, AsyncResult):
631 map(theids.add, msg_id.msg_ids)
631 map(theids.add, msg_id.msg_ids)
632 continue
632 continue
633 theids.add(msg_id)
633 theids.add(msg_id)
634 if not theids.intersection(self.outstanding):
634 if not theids.intersection(self.outstanding):
635 return True
635 return True
636 self.spin()
636 self.spin()
637 while theids.intersection(self.outstanding):
637 while theids.intersection(self.outstanding):
638 if timeout >= 0 and ( time.time()-tic ) > timeout:
638 if timeout >= 0 and ( time.time()-tic ) > timeout:
639 break
639 break
640 time.sleep(1e-3)
640 time.sleep(1e-3)
641 self.spin()
641 self.spin()
642 return len(theids.intersection(self.outstanding)) == 0
642 return len(theids.intersection(self.outstanding)) == 0
643
643
644 #--------------------------------------------------------------------------
644 #--------------------------------------------------------------------------
645 # Control methods
645 # Control methods
646 #--------------------------------------------------------------------------
646 #--------------------------------------------------------------------------
647
647
648 @spinfirst
648 @spinfirst
649 @defaultblock
649 @defaultblock
650 def clear(self, targets=None, block=None):
650 def clear(self, targets=None, block=None):
651 """Clear the namespace in target(s)."""
651 """Clear the namespace in target(s)."""
652 targets = self._build_targets(targets)[0]
652 targets = self._build_targets(targets)[0]
653 for t in targets:
653 for t in targets:
654 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
654 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
655 error = False
655 error = False
656 if self.block:
656 if self.block:
657 for i in range(len(targets)):
657 for i in range(len(targets)):
658 idents,msg = self.session.recv(self._control_socket,0)
658 idents,msg = self.session.recv(self._control_socket,0)
659 if self.debug:
659 if self.debug:
660 pprint(msg)
660 pprint(msg)
661 if msg['content']['status'] != 'ok':
661 if msg['content']['status'] != 'ok':
662 error = ss.unwrap_exception(msg['content'])
662 error = ss.unwrap_exception(msg['content'])
663 if error:
663 if error:
664 return error
664 return error
665
665
666
666
667 @spinfirst
667 @spinfirst
668 @defaultblock
668 @defaultblock
669 def abort(self, msg_ids = None, targets=None, block=None):
669 def abort(self, msg_ids = None, targets=None, block=None):
670 """Abort the execution queues of target(s)."""
670 """Abort the execution queues of target(s)."""
671 targets = self._build_targets(targets)[0]
671 targets = self._build_targets(targets)[0]
672 if isinstance(msg_ids, basestring):
672 if isinstance(msg_ids, basestring):
673 msg_ids = [msg_ids]
673 msg_ids = [msg_ids]
674 content = dict(msg_ids=msg_ids)
674 content = dict(msg_ids=msg_ids)
675 for t in targets:
675 for t in targets:
676 self.session.send(self._control_socket, 'abort_request',
676 self.session.send(self._control_socket, 'abort_request',
677 content=content, ident=t)
677 content=content, ident=t)
678 error = False
678 error = False
679 if self.block:
679 if self.block:
680 for i in range(len(targets)):
680 for i in range(len(targets)):
681 idents,msg = self.session.recv(self._control_socket,0)
681 idents,msg = self.session.recv(self._control_socket,0)
682 if self.debug:
682 if self.debug:
683 pprint(msg)
683 pprint(msg)
684 if msg['content']['status'] != 'ok':
684 if msg['content']['status'] != 'ok':
685 error = ss.unwrap_exception(msg['content'])
685 error = ss.unwrap_exception(msg['content'])
686 if error:
686 if error:
687 return error
687 return error
688
688
689 @spinfirst
689 @spinfirst
690 @defaultblock
690 @defaultblock
691 def shutdown(self, targets=None, restart=False, controller=False, block=None):
691 def shutdown(self, targets=None, restart=False, controller=False, block=None):
692 """Terminates one or more engine processes, optionally including the controller."""
692 """Terminates one or more engine processes, optionally including the controller."""
693 if controller:
693 if controller:
694 targets = 'all'
694 targets = 'all'
695 targets = self._build_targets(targets)[0]
695 targets = self._build_targets(targets)[0]
696 for t in targets:
696 for t in targets:
697 self.session.send(self._control_socket, 'shutdown_request',
697 self.session.send(self._control_socket, 'shutdown_request',
698 content={'restart':restart},ident=t)
698 content={'restart':restart},ident=t)
699 error = False
699 error = False
700 if block or controller:
700 if block or controller:
701 for i in range(len(targets)):
701 for i in range(len(targets)):
702 idents,msg = self.session.recv(self._control_socket,0)
702 idents,msg = self.session.recv(self._control_socket,0)
703 if self.debug:
703 if self.debug:
704 pprint(msg)
704 pprint(msg)
705 if msg['content']['status'] != 'ok':
705 if msg['content']['status'] != 'ok':
706 error = ss.unwrap_exception(msg['content'])
706 error = ss.unwrap_exception(msg['content'])
707
707
708 if controller:
708 if controller:
709 time.sleep(0.25)
709 time.sleep(0.25)
710 self.session.send(self._query_socket, 'shutdown_request')
710 self.session.send(self._query_socket, 'shutdown_request')
711 idents,msg = self.session.recv(self._query_socket, 0)
711 idents,msg = self.session.recv(self._query_socket, 0)
712 if self.debug:
712 if self.debug:
713 pprint(msg)
713 pprint(msg)
714 if msg['content']['status'] != 'ok':
714 if msg['content']['status'] != 'ok':
715 error = ss.unwrap_exception(msg['content'])
715 error = ss.unwrap_exception(msg['content'])
716
716
717 if error:
717 if error:
718 raise error
718 raise error
719
719
720 #--------------------------------------------------------------------------
720 #--------------------------------------------------------------------------
721 # Execution methods
721 # Execution methods
722 #--------------------------------------------------------------------------
722 #--------------------------------------------------------------------------
723
723
724 @defaultblock
724 @defaultblock
725 def execute(self, code, targets='all', block=None):
725 def execute(self, code, targets='all', block=None):
726 """Executes `code` on `targets` in blocking or nonblocking manner.
726 """Executes `code` on `targets` in blocking or nonblocking manner.
727
727
728 ``execute`` is always `bound` (affects engine namespace)
728 ``execute`` is always `bound` (affects engine namespace)
729
729
730 Parameters
730 Parameters
731 ----------
731 ----------
732 code : str
732 code : str
733 the code string to be executed
733 the code string to be executed
734 targets : int/str/list of ints/strs
734 targets : int/str/list of ints/strs
735 the engines on which to execute
735 the engines on which to execute
736 default : all
736 default : all
737 block : bool
737 block : bool
738 whether or not to wait until done to return
738 whether or not to wait until done to return
739 default: self.block
739 default: self.block
740 """
740 """
741 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
741 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
742 return result
742 return result
743
743
744 def run(self, code, block=None):
744 def run(self, code, block=None):
745 """Runs `code` on an engine.
745 """Runs `code` on an engine.
746
746
747 Calls to this are load-balanced.
747 Calls to this are load-balanced.
748
748
749 ``run`` is never `bound` (no effect on engine namespace)
749 ``run`` is never `bound` (no effect on engine namespace)
750
750
751 Parameters
751 Parameters
752 ----------
752 ----------
753 code : str
753 code : str
754 the code string to be executed
754 the code string to be executed
755 block : bool
755 block : bool
756 whether or not to wait until done
756 whether or not to wait until done
757
757
758 """
758 """
759 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
759 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
760 return result
760 return result
761
761
762 def _maybe_raise(self, result):
762 def _maybe_raise(self, result):
763 """wrapper for maybe raising an exception if apply failed."""
763 """wrapper for maybe raising an exception if apply failed."""
764 if isinstance(result, error.RemoteError):
764 if isinstance(result, error.RemoteError):
765 raise result
765 raise result
766
766
767 return result
767 return result
768
768
769 def _build_dependency(self, dep):
770 """helper for building jsonable dependencies from various input forms"""
771 if isinstance(dep, Dependency):
772 return dep.as_dict()
773 elif isinstance(dep, AsyncResult):
774 return dep.msg_ids
775 elif dep is None:
776 return []
777 elif isinstance(dep, set):
778 return list(dep)
779 elif isinstance(dep, (list,dict)):
780 return dep
781 elif isinstance(dep, str):
782 return [dep]
783 else:
784 raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep))
785
769 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
786 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
770 after=None, follow=None):
787 after=None, follow=None, timeout=None):
771 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
788 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
772
789
773 This is the central execution command for the client.
790 This is the central execution command for the client.
774
791
775 Parameters
792 Parameters
776 ----------
793 ----------
777
794
778 f : function
795 f : function
779 The fuction to be called remotely
796 The fuction to be called remotely
780 args : tuple/list
797 args : tuple/list
781 The positional arguments passed to `f`
798 The positional arguments passed to `f`
782 kwargs : dict
799 kwargs : dict
783 The keyword arguments passed to `f`
800 The keyword arguments passed to `f`
784 bound : bool (default: True)
801 bound : bool (default: True)
785 Whether to execute in the Engine(s) namespace, or in a clean
802 Whether to execute in the Engine(s) namespace, or in a clean
786 namespace not affecting the engine.
803 namespace not affecting the engine.
787 block : bool (default: self.block)
804 block : bool (default: self.block)
788 Whether to wait for the result, or return immediately.
805 Whether to wait for the result, or return immediately.
789 False:
806 False:
790 returns msg_id(s)
807 returns msg_id(s)
791 if multiple targets:
808 if multiple targets:
792 list of ids
809 list of ids
793 True:
810 True:
794 returns actual result(s) of f(*args, **kwargs)
811 returns actual result(s) of f(*args, **kwargs)
795 if multiple targets:
812 if multiple targets:
796 dict of results, by engine ID
813 dict of results, by engine ID
797 targets : int,list of ints, 'all', None
814 targets : int,list of ints, 'all', None
798 Specify the destination of the job.
815 Specify the destination of the job.
799 if None:
816 if None:
800 Submit via Task queue for load-balancing.
817 Submit via Task queue for load-balancing.
801 if 'all':
818 if 'all':
802 Run on all active engines
819 Run on all active engines
803 if list:
820 if list:
804 Run on each specified engine
821 Run on each specified engine
805 if int:
822 if int:
806 Run on single engine
823 Run on single engine
807
824
808 after : Dependency or collection of msg_ids
825 after : Dependency or collection of msg_ids
809 Only for load-balanced execution (targets=None)
826 Only for load-balanced execution (targets=None)
810 Specify a list of msg_ids as a time-based dependency.
827 Specify a list of msg_ids as a time-based dependency.
811 This job will only be run *after* the dependencies
828 This job will only be run *after* the dependencies
812 have been met.
829 have been met.
813
830
814 follow : Dependency or collection of msg_ids
831 follow : Dependency or collection of msg_ids
815 Only for load-balanced execution (targets=None)
832 Only for load-balanced execution (targets=None)
816 Specify a list of msg_ids as a location-based dependency.
833 Specify a list of msg_ids as a location-based dependency.
817 This job will only be run on an engine where this dependency
834 This job will only be run on an engine where this dependency
818 is met.
835 is met.
819
836
837 timeout : float or None
838 Only for load-balanced execution (targets=None)
839 Specify an amount of time (in seconds)
840
820 Returns
841 Returns
821 -------
842 -------
822 if block is False:
843 if block is False:
823 if single target:
844 if single target:
824 return msg_id
845 return msg_id
825 else:
846 else:
826 return list of msg_ids
847 return list of msg_ids
827 ? (should this be dict like block=True) ?
848 ? (should this be dict like block=True) ?
828 else:
849 else:
829 if single target:
850 if single target:
830 return result of f(*args, **kwargs)
851 return result of f(*args, **kwargs)
831 else:
852 else:
832 return dict of results, keyed by engine
853 return dict of results, keyed by engine
833 """
854 """
834
855
835 # defaults:
856 # defaults:
836 block = block if block is not None else self.block
857 block = block if block is not None else self.block
837 args = args if args is not None else []
858 args = args if args is not None else []
838 kwargs = kwargs if kwargs is not None else {}
859 kwargs = kwargs if kwargs is not None else {}
839
860
840 # enforce types of f,args,kwrags
861 # enforce types of f,args,kwrags
841 if not callable(f):
862 if not callable(f):
842 raise TypeError("f must be callable, not %s"%type(f))
863 raise TypeError("f must be callable, not %s"%type(f))
843 if not isinstance(args, (tuple, list)):
864 if not isinstance(args, (tuple, list)):
844 raise TypeError("args must be tuple or list, not %s"%type(args))
865 raise TypeError("args must be tuple or list, not %s"%type(args))
845 if not isinstance(kwargs, dict):
866 if not isinstance(kwargs, dict):
846 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
867 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
847
868
848 if isinstance(after, Dependency):
869 after = self._build_dependency(after)
849 after = after.as_dict()
870 follow = self._build_dependency(follow)
850 elif isinstance(after, AsyncResult):
871
851 after=after.msg_ids
872 options = dict(bound=bound, block=block)
852 elif after is None:
853 after = []
854 if isinstance(follow, Dependency):
855 # if len(follow) > 1 and follow.mode == 'all':
856 # warn("complex follow-dependencies are not rigorously tested for reachability", UserWarning)
857 follow = follow.as_dict()
858 elif isinstance(follow, AsyncResult):
859 follow=follow.msg_ids
860 elif follow is None:
861 follow = []
862 options = dict(bound=bound, block=block, after=after, follow=follow)
863
873
864 if targets is None:
874 if targets is None:
865 return self._apply_balanced(f, args, kwargs, **options)
875 return self._apply_balanced(f, args, kwargs, timeout=timeout,
876 after=after, follow=follow, **options)
866 else:
877 else:
867 return self._apply_direct(f, args, kwargs, targets=targets, **options)
878 return self._apply_direct(f, args, kwargs, targets=targets, **options)
868
879
869 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
880 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
870 after=None, follow=None):
881 after=None, follow=None, timeout=None):
871 """The underlying method for applying functions in a load balanced
882 """The underlying method for applying functions in a load balanced
872 manner, via the task queue."""
883 manner, via the task queue."""
873 subheader = dict(after=after, follow=follow)
884 subheader = dict(after=after, follow=follow, timeout=timeout)
874 bufs = ss.pack_apply_message(f,args,kwargs)
885 bufs = ss.pack_apply_message(f,args,kwargs)
875 content = dict(bound=bound)
886 content = dict(bound=bound)
876
887
877 msg = self.session.send(self._task_socket, "apply_request",
888 msg = self.session.send(self._task_socket, "apply_request",
878 content=content, buffers=bufs, subheader=subheader)
889 content=content, buffers=bufs, subheader=subheader)
879 msg_id = msg['msg_id']
890 msg_id = msg['msg_id']
880 self.outstanding.add(msg_id)
891 self.outstanding.add(msg_id)
881 self.history.append(msg_id)
892 self.history.append(msg_id)
882 ar = AsyncResult(self, [msg_id], fname=f.__name__)
893 ar = AsyncResult(self, [msg_id], fname=f.__name__)
883 if block:
894 if block:
884 return ar.get()
895 return ar.get()
885 else:
896 else:
886 return ar
897 return ar
887
898
888 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
899 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
889 after=None, follow=None):
890 """Then underlying method for applying functions to specific engines
900 """Then underlying method for applying functions to specific engines
891 via the MUX queue."""
901 via the MUX queue."""
892
902
893 queues,targets = self._build_targets(targets)
903 queues,targets = self._build_targets(targets)
894
904
895 subheader = dict(after=after, follow=follow)
905 subheader = dict(after=after, follow=follow)
896 content = dict(bound=bound)
906 content = dict(bound=bound)
897 bufs = ss.pack_apply_message(f,args,kwargs)
907 bufs = ss.pack_apply_message(f,args,kwargs)
898
908
899 msg_ids = []
909 msg_ids = []
900 for queue in queues:
910 for queue in queues:
901 msg = self.session.send(self._mux_socket, "apply_request",
911 msg = self.session.send(self._mux_socket, "apply_request",
902 content=content, buffers=bufs,ident=queue, subheader=subheader)
912 content=content, buffers=bufs,ident=queue, subheader=subheader)
903 msg_id = msg['msg_id']
913 msg_id = msg['msg_id']
904 self.outstanding.add(msg_id)
914 self.outstanding.add(msg_id)
905 self.history.append(msg_id)
915 self.history.append(msg_id)
906 msg_ids.append(msg_id)
916 msg_ids.append(msg_id)
907 ar = AsyncResult(self, msg_ids, fname=f.__name__)
917 ar = AsyncResult(self, msg_ids, fname=f.__name__)
908 if block:
918 if block:
909 return ar.get()
919 return ar.get()
910 else:
920 else:
911 return ar
921 return ar
912
922
913 #--------------------------------------------------------------------------
923 #--------------------------------------------------------------------------
914 # Map and decorators
924 # Map and decorators
915 #--------------------------------------------------------------------------
925 #--------------------------------------------------------------------------
916
926
917 def map(self, f, *sequences):
927 def map(self, f, *sequences):
918 """Parallel version of builtin `map`, using all our engines."""
928 """Parallel version of builtin `map`, using all our engines."""
919 pf = ParallelFunction(self, f, block=self.block,
929 pf = ParallelFunction(self, f, block=self.block,
920 bound=True, targets='all')
930 bound=True, targets='all')
921 return pf.map(*sequences)
931 return pf.map(*sequences)
922
932
923 def parallel(self, bound=True, targets='all', block=True):
933 def parallel(self, bound=True, targets='all', block=True):
924 """Decorator for making a ParallelFunction."""
934 """Decorator for making a ParallelFunction."""
925 return parallel(self, bound=bound, targets=targets, block=block)
935 return parallel(self, bound=bound, targets=targets, block=block)
926
936
927 def remote(self, bound=True, targets='all', block=True):
937 def remote(self, bound=True, targets='all', block=True):
928 """Decorator for making a RemoteFunction."""
938 """Decorator for making a RemoteFunction."""
929 return remote(self, bound=bound, targets=targets, block=block)
939 return remote(self, bound=bound, targets=targets, block=block)
930
940
931 #--------------------------------------------------------------------------
941 #--------------------------------------------------------------------------
932 # Data movement
942 # Data movement
933 #--------------------------------------------------------------------------
943 #--------------------------------------------------------------------------
934
944
935 @defaultblock
945 @defaultblock
936 def push(self, ns, targets='all', block=None):
946 def push(self, ns, targets='all', block=None):
937 """Push the contents of `ns` into the namespace on `target`"""
947 """Push the contents of `ns` into the namespace on `target`"""
938 if not isinstance(ns, dict):
948 if not isinstance(ns, dict):
939 raise TypeError("Must be a dict, not %s"%type(ns))
949 raise TypeError("Must be a dict, not %s"%type(ns))
940 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
950 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
941 return result
951 return result
942
952
943 @defaultblock
953 @defaultblock
944 def pull(self, keys, targets='all', block=None):
954 def pull(self, keys, targets='all', block=None):
945 """Pull objects from `target`'s namespace by `keys`"""
955 """Pull objects from `target`'s namespace by `keys`"""
946 if isinstance(keys, str):
956 if isinstance(keys, str):
947 pass
957 pass
948 elif isinstance(keys, (list,tuple,set)):
958 elif isinstance(keys, (list,tuple,set)):
949 for key in keys:
959 for key in keys:
950 if not isinstance(key, str):
960 if not isinstance(key, str):
951 raise TypeError
961 raise TypeError
952 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
962 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
953 return result
963 return result
954
964
955 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
965 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
956 """
966 """
957 Partition a Python sequence and send the partitions to a set of engines.
967 Partition a Python sequence and send the partitions to a set of engines.
958 """
968 """
959 block = block if block is not None else self.block
969 block = block if block is not None else self.block
960 targets = self._build_targets(targets)[-1]
970 targets = self._build_targets(targets)[-1]
961 mapObject = Map.dists[dist]()
971 mapObject = Map.dists[dist]()
962 nparts = len(targets)
972 nparts = len(targets)
963 msg_ids = []
973 msg_ids = []
964 for index, engineid in enumerate(targets):
974 for index, engineid in enumerate(targets):
965 partition = mapObject.getPartition(seq, index, nparts)
975 partition = mapObject.getPartition(seq, index, nparts)
966 if flatten and len(partition) == 1:
976 if flatten and len(partition) == 1:
967 r = self.push({key: partition[0]}, targets=engineid, block=False)
977 r = self.push({key: partition[0]}, targets=engineid, block=False)
968 else:
978 else:
969 r = self.push({key: partition}, targets=engineid, block=False)
979 r = self.push({key: partition}, targets=engineid, block=False)
970 msg_ids.extend(r.msg_ids)
980 msg_ids.extend(r.msg_ids)
971 r = AsyncResult(self, msg_ids, fname='scatter')
981 r = AsyncResult(self, msg_ids, fname='scatter')
972 if block:
982 if block:
973 return r.get()
983 return r.get()
974 else:
984 else:
975 return r
985 return r
976
986
977 def gather(self, key, dist='b', targets='all', block=None):
987 def gather(self, key, dist='b', targets='all', block=None):
978 """
988 """
979 Gather a partitioned sequence on a set of engines as a single local seq.
989 Gather a partitioned sequence on a set of engines as a single local seq.
980 """
990 """
981 block = block if block is not None else self.block
991 block = block if block is not None else self.block
982
992
983 targets = self._build_targets(targets)[-1]
993 targets = self._build_targets(targets)[-1]
984 mapObject = Map.dists[dist]()
994 mapObject = Map.dists[dist]()
985 msg_ids = []
995 msg_ids = []
986 for index, engineid in enumerate(targets):
996 for index, engineid in enumerate(targets):
987 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
997 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
988
998
989 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
999 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
990 if block:
1000 if block:
991 return r.get()
1001 return r.get()
992 else:
1002 else:
993 return r
1003 return r
994
1004
995 #--------------------------------------------------------------------------
1005 #--------------------------------------------------------------------------
996 # Query methods
1006 # Query methods
997 #--------------------------------------------------------------------------
1007 #--------------------------------------------------------------------------
998
1008
999 @spinfirst
1009 @spinfirst
1000 def get_results(self, msg_ids, status_only=False):
1010 def get_results(self, msg_ids, status_only=False):
1001 """Returns the result of the execute or task request with `msg_ids`.
1011 """Returns the result of the execute or task request with `msg_ids`.
1002
1012
1003 Parameters
1013 Parameters
1004 ----------
1014 ----------
1005 msg_ids : list of ints or msg_ids
1015 msg_ids : list of ints or msg_ids
1006 if int:
1016 if int:
1007 Passed as index to self.history for convenience.
1017 Passed as index to self.history for convenience.
1008 status_only : bool (default: False)
1018 status_only : bool (default: False)
1009 if False:
1019 if False:
1010 return the actual results
1020 return the actual results
1011
1021
1012 Returns
1022 Returns
1013 -------
1023 -------
1014
1024
1015 results : dict
1025 results : dict
1016 There will always be the keys 'pending' and 'completed', which will
1026 There will always be the keys 'pending' and 'completed', which will
1017 be lists of msg_ids.
1027 be lists of msg_ids.
1018 """
1028 """
1019 if not isinstance(msg_ids, (list,tuple)):
1029 if not isinstance(msg_ids, (list,tuple)):
1020 msg_ids = [msg_ids]
1030 msg_ids = [msg_ids]
1021 theids = []
1031 theids = []
1022 for msg_id in msg_ids:
1032 for msg_id in msg_ids:
1023 if isinstance(msg_id, int):
1033 if isinstance(msg_id, int):
1024 msg_id = self.history[msg_id]
1034 msg_id = self.history[msg_id]
1025 if not isinstance(msg_id, str):
1035 if not isinstance(msg_id, str):
1026 raise TypeError("msg_ids must be str, not %r"%msg_id)
1036 raise TypeError("msg_ids must be str, not %r"%msg_id)
1027 theids.append(msg_id)
1037 theids.append(msg_id)
1028
1038
1029 completed = []
1039 completed = []
1030 local_results = {}
1040 local_results = {}
1031 # temporarily disable local shortcut
1041 # temporarily disable local shortcut
1032 # for msg_id in list(theids):
1042 # for msg_id in list(theids):
1033 # if msg_id in self.results:
1043 # if msg_id in self.results:
1034 # completed.append(msg_id)
1044 # completed.append(msg_id)
1035 # local_results[msg_id] = self.results[msg_id]
1045 # local_results[msg_id] = self.results[msg_id]
1036 # theids.remove(msg_id)
1046 # theids.remove(msg_id)
1037
1047
1038 if theids: # some not locally cached
1048 if theids: # some not locally cached
1039 content = dict(msg_ids=theids, status_only=status_only)
1049 content = dict(msg_ids=theids, status_only=status_only)
1040 msg = self.session.send(self._query_socket, "result_request", content=content)
1050 msg = self.session.send(self._query_socket, "result_request", content=content)
1041 zmq.select([self._query_socket], [], [])
1051 zmq.select([self._query_socket], [], [])
1042 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1052 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1043 if self.debug:
1053 if self.debug:
1044 pprint(msg)
1054 pprint(msg)
1045 content = msg['content']
1055 content = msg['content']
1046 if content['status'] != 'ok':
1056 if content['status'] != 'ok':
1047 raise ss.unwrap_exception(content)
1057 raise ss.unwrap_exception(content)
1048 buffers = msg['buffers']
1058 buffers = msg['buffers']
1049 else:
1059 else:
1050 content = dict(completed=[],pending=[])
1060 content = dict(completed=[],pending=[])
1051
1061
1052 content['completed'].extend(completed)
1062 content['completed'].extend(completed)
1053
1063
1054 if status_only:
1064 if status_only:
1055 return content
1065 return content
1056
1066
1057 failures = []
1067 failures = []
1058 # load cached results into result:
1068 # load cached results into result:
1059 content.update(local_results)
1069 content.update(local_results)
1060 # update cache with results:
1070 # update cache with results:
1061 for msg_id in sorted(theids):
1071 for msg_id in sorted(theids):
1062 if msg_id in content['completed']:
1072 if msg_id in content['completed']:
1063 rec = content[msg_id]
1073 rec = content[msg_id]
1064 parent = rec['header']
1074 parent = rec['header']
1065 header = rec['result_header']
1075 header = rec['result_header']
1066 rcontent = rec['result_content']
1076 rcontent = rec['result_content']
1067 iodict = rec['io']
1077 iodict = rec['io']
1068 if isinstance(rcontent, str):
1078 if isinstance(rcontent, str):
1069 rcontent = self.session.unpack(rcontent)
1079 rcontent = self.session.unpack(rcontent)
1070
1080
1071 md = self.metadata.setdefault(msg_id, Metadata())
1081 md = self.metadata.setdefault(msg_id, Metadata())
1072 md.update(self._extract_metadata(header, parent, rcontent))
1082 md.update(self._extract_metadata(header, parent, rcontent))
1073 md.update(iodict)
1083 md.update(iodict)
1074
1084
1075 if rcontent['status'] == 'ok':
1085 if rcontent['status'] == 'ok':
1076 res,buffers = ss.unserialize_object(buffers)
1086 res,buffers = ss.unserialize_object(buffers)
1077 else:
1087 else:
1078 res = ss.unwrap_exception(rcontent)
1088 res = ss.unwrap_exception(rcontent)
1079 failures.append(res)
1089 failures.append(res)
1080
1090
1081 self.results[msg_id] = res
1091 self.results[msg_id] = res
1082 content[msg_id] = res
1092 content[msg_id] = res
1083
1093
1084 error.collect_exceptions(failures, "get_results")
1094 error.collect_exceptions(failures, "get_results")
1085 return content
1095 return content
1086
1096
1087 @spinfirst
1097 @spinfirst
1088 def queue_status(self, targets=None, verbose=False):
1098 def queue_status(self, targets=None, verbose=False):
1089 """Fetch the status of engine queues.
1099 """Fetch the status of engine queues.
1090
1100
1091 Parameters
1101 Parameters
1092 ----------
1102 ----------
1093 targets : int/str/list of ints/strs
1103 targets : int/str/list of ints/strs
1094 the engines on which to execute
1104 the engines on which to execute
1095 default : all
1105 default : all
1096 verbose : bool
1106 verbose : bool
1097 Whether to return lengths only, or lists of ids for each element
1107 Whether to return lengths only, or lists of ids for each element
1098 """
1108 """
1099 targets = self._build_targets(targets)[1]
1109 targets = self._build_targets(targets)[1]
1100 content = dict(targets=targets, verbose=verbose)
1110 content = dict(targets=targets, verbose=verbose)
1101 self.session.send(self._query_socket, "queue_request", content=content)
1111 self.session.send(self._query_socket, "queue_request", content=content)
1102 idents,msg = self.session.recv(self._query_socket, 0)
1112 idents,msg = self.session.recv(self._query_socket, 0)
1103 if self.debug:
1113 if self.debug:
1104 pprint(msg)
1114 pprint(msg)
1105 content = msg['content']
1115 content = msg['content']
1106 status = content.pop('status')
1116 status = content.pop('status')
1107 if status != 'ok':
1117 if status != 'ok':
1108 raise ss.unwrap_exception(content)
1118 raise ss.unwrap_exception(content)
1109 return ss.rekey(content)
1119 return ss.rekey(content)
1110
1120
1111 @spinfirst
1121 @spinfirst
1112 def purge_results(self, msg_ids=[], targets=[]):
1122 def purge_results(self, msg_ids=[], targets=[]):
1113 """Tell the controller to forget results.
1123 """Tell the controller to forget results.
1114
1124
1115 Individual results can be purged by msg_id, or the entire
1125 Individual results can be purged by msg_id, or the entire
1116 history of specific targets can be purged.
1126 history of specific targets can be purged.
1117
1127
1118 Parameters
1128 Parameters
1119 ----------
1129 ----------
1120 msg_ids : str or list of strs
1130 msg_ids : str or list of strs
1121 the msg_ids whose results should be forgotten.
1131 the msg_ids whose results should be forgotten.
1122 targets : int/str/list of ints/strs
1132 targets : int/str/list of ints/strs
1123 The targets, by uuid or int_id, whose entire history is to be purged.
1133 The targets, by uuid or int_id, whose entire history is to be purged.
1124 Use `targets='all'` to scrub everything from the controller's memory.
1134 Use `targets='all'` to scrub everything from the controller's memory.
1125
1135
1126 default : None
1136 default : None
1127 """
1137 """
1128 if not targets and not msg_ids:
1138 if not targets and not msg_ids:
1129 raise ValueError
1139 raise ValueError
1130 if targets:
1140 if targets:
1131 targets = self._build_targets(targets)[1]
1141 targets = self._build_targets(targets)[1]
1132 content = dict(targets=targets, msg_ids=msg_ids)
1142 content = dict(targets=targets, msg_ids=msg_ids)
1133 self.session.send(self._query_socket, "purge_request", content=content)
1143 self.session.send(self._query_socket, "purge_request", content=content)
1134 idents, msg = self.session.recv(self._query_socket, 0)
1144 idents, msg = self.session.recv(self._query_socket, 0)
1135 if self.debug:
1145 if self.debug:
1136 pprint(msg)
1146 pprint(msg)
1137 content = msg['content']
1147 content = msg['content']
1138 if content['status'] != 'ok':
1148 if content['status'] != 'ok':
1139 raise ss.unwrap_exception(content)
1149 raise ss.unwrap_exception(content)
1140
1150
1141 #----------------------------------------
1151 #----------------------------------------
1142 # activate for %px,%autopx magics
1152 # activate for %px,%autopx magics
1143 #----------------------------------------
1153 #----------------------------------------
1144 def activate(self):
1154 def activate(self):
1145 """Make this `View` active for parallel magic commands.
1155 """Make this `View` active for parallel magic commands.
1146
1156
1147 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1157 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1148 In a given IPython session there is a single active one. While
1158 In a given IPython session there is a single active one. While
1149 there can be many `Views` created and used by the user,
1159 there can be many `Views` created and used by the user,
1150 there is only one active one. The active `View` is used whenever
1160 there is only one active one. The active `View` is used whenever
1151 the magic commands %px and %autopx are used.
1161 the magic commands %px and %autopx are used.
1152
1162
1153 The activate() method is called on a given `View` to make it
1163 The activate() method is called on a given `View` to make it
1154 active. Once this has been done, the magic commands can be used.
1164 active. Once this has been done, the magic commands can be used.
1155 """
1165 """
1156
1166
1157 try:
1167 try:
1158 # This is injected into __builtins__.
1168 # This is injected into __builtins__.
1159 ip = get_ipython()
1169 ip = get_ipython()
1160 except NameError:
1170 except NameError:
1161 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1171 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1162 else:
1172 else:
1163 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1173 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1164 if pmagic is not None:
1174 if pmagic is not None:
1165 pmagic.active_multiengine_client = self
1175 pmagic.active_multiengine_client = self
1166 else:
1176 else:
1167 print "You must first load the parallelmagic extension " \
1177 print "You must first load the parallelmagic extension " \
1168 "by doing '%load_ext parallelmagic'"
1178 "by doing '%load_ext parallelmagic'"
1169
1179
1170 class AsynClient(Client):
1180 class AsynClient(Client):
1171 """An Asynchronous client, using the Tornado Event Loop.
1181 """An Asynchronous client, using the Tornado Event Loop.
1172 !!!unfinished!!!"""
1182 !!!unfinished!!!"""
1173 io_loop = None
1183 io_loop = None
1174 _queue_stream = None
1184 _queue_stream = None
1175 _notifier_stream = None
1185 _notifier_stream = None
1176 _task_stream = None
1186 _task_stream = None
1177 _control_stream = None
1187 _control_stream = None
1178
1188
1179 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1189 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1180 Client.__init__(self, addr, context, username, debug)
1190 Client.__init__(self, addr, context, username, debug)
1181 if io_loop is None:
1191 if io_loop is None:
1182 io_loop = ioloop.IOLoop.instance()
1192 io_loop = ioloop.IOLoop.instance()
1183 self.io_loop = io_loop
1193 self.io_loop = io_loop
1184
1194
1185 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1195 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1186 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1196 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1187 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1197 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1188 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1198 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1189
1199
1190 def spin(self):
1200 def spin(self):
1191 for stream in (self.queue_stream, self.notifier_stream,
1201 for stream in (self.queue_stream, self.notifier_stream,
1192 self.task_stream, self.control_stream):
1202 self.task_stream, self.control_stream):
1193 stream.flush()
1203 stream.flush()
1194
1204
1195 __all__ = [ 'Client',
1205 __all__ = [ 'Client',
1196 'depend',
1206 'depend',
1197 'require',
1207 'require',
1198 'remote',
1208 'remote',
1199 'parallel',
1209 'parallel',
1200 'RemoteFunction',
1210 'RemoteFunction',
1201 'ParallelFunction',
1211 'ParallelFunction',
1202 'DirectView',
1212 'DirectView',
1203 'LoadBalancedView',
1213 'LoadBalancedView',
1204 'AsyncResult',
1214 'AsyncResult',
1205 'AsyncMapResult'
1215 'AsyncMapResult'
1206 ]
1216 ]
@@ -1,108 +1,108
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is a collection of one Hub and several Schedulers.
3 This is a collection of one Hub and several Schedulers.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010 The IPython Development Team
6 # Copyright (C) 2010 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
15 from __future__ import print_function
16
16
17 import logging
17 import logging
18 from multiprocessing import Process
18 from multiprocessing import Process
19
19
20 import zmq
20 import zmq
21
21
22 # internal:
22 # internal:
23 from IPython.utils.importstring import import_item
23 from IPython.utils.importstring import import_item
24 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
24 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
25
25
26 from entry_point import signal_children
26 from entry_point import signal_children
27
27
28
28
29 from scheduler import launch_scheduler
29 from scheduler import launch_scheduler
30 from hub import Hub, HubFactory
30 from hub import Hub, HubFactory
31
31
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Configurable
33 # Configurable
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35
35
36
36
37 class ControllerFactory(HubFactory):
37 class ControllerFactory(HubFactory):
38 """Configurable for setting up a Hub and Schedulers."""
38 """Configurable for setting up a Hub and Schedulers."""
39
39
40 scheme = Str('pure', config=True)
40 scheme = Str('pure', config=True)
41 usethreads = Bool(False, config=True)
41 usethreads = Bool(False, config=True)
42
42
43 # internal
43 # internal
44 children = List()
44 children = List()
45 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
45 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
46
46
47 def _usethreads_changed(self, name, old, new):
47 def _usethreads_changed(self, name, old, new):
48 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
48 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
49
49
50 def __init__(self, **kwargs):
50 def __init__(self, **kwargs):
51 super(ControllerFactory, self).__init__(**kwargs)
51 super(ControllerFactory, self).__init__(**kwargs)
52 self.subconstructors.append(self.construct_schedulers)
52 self.subconstructors.append(self.construct_schedulers)
53
53
54 def start(self):
54 def start(self):
55 super(ControllerFactory, self).start()
55 super(ControllerFactory, self).start()
56 for child in self.children:
56 for child in self.children:
57 child.start()
57 child.start()
58 if not self.usethreads:
58 if not self.usethreads:
59 signal_children([ getattr(c, 'launcher', c) for c in self.children ])
59 signal_children([ getattr(c, 'launcher', c) for c in self.children ])
60
60
61
61
62 def construct_schedulers(self):
62 def construct_schedulers(self):
63 children = self.children
63 children = self.children
64 mq = import_item(self.mq_class)
64 mq = import_item(self.mq_class)
65
65
66 # IOPub relay (in a Process)
66 # IOPub relay (in a Process)
67 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
67 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
68 q.bind_in(self.client_addrs['iopub'])
68 q.bind_in(self.client_addrs['iopub'])
69 q.bind_out(self.engine_addrs['iopub'])
69 q.bind_out(self.engine_addrs['iopub'])
70 q.setsockopt_out(zmq.SUBSCRIBE, '')
70 q.setsockopt_out(zmq.SUBSCRIBE, '')
71 q.connect_mon(self.monitor_url)
71 q.connect_mon(self.monitor_url)
72 q.daemon=True
72 q.daemon=True
73 children.append(q)
73 children.append(q)
74
74
75 # Multiplexer Queue (in a Process)
75 # Multiplexer Queue (in a Process)
76 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
76 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
77 q.bind_in(self.client_addrs['mux'])
77 q.bind_in(self.client_addrs['mux'])
78 q.bind_out(self.engine_addrs['mux'])
78 q.bind_out(self.engine_addrs['mux'])
79 q.connect_mon(self.monitor_url)
79 q.connect_mon(self.monitor_url)
80 q.daemon=True
80 q.daemon=True
81 children.append(q)
81 children.append(q)
82
82
83 # Control Queue (in a Process)
83 # Control Queue (in a Process)
84 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
84 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
85 q.bind_in(self.client_addrs['control'])
85 q.bind_in(self.client_addrs['control'])
86 q.bind_out(self.engine_addrs['control'])
86 q.bind_out(self.engine_addrs['control'])
87 q.connect_mon(self.monitor_url)
87 q.connect_mon(self.monitor_url)
88 q.daemon=True
88 q.daemon=True
89 children.append(q)
89 children.append(q)
90 # Task Queue (in a Process)
90 # Task Queue (in a Process)
91 if self.scheme == 'pure':
91 if self.scheme == 'pure':
92 self.log.warn("task::using pure XREQ Task scheduler")
92 self.log.warn("task::using pure XREQ Task scheduler")
93 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
93 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
94 q.bind_in(self.client_addrs['task'])
94 q.bind_in(self.client_addrs['task'])
95 q.bind_out(self.engine_addrs['task'])
95 q.bind_out(self.engine_addrs['task'])
96 q.connect_mon(self.monitor_url)
96 q.connect_mon(self.monitor_url)
97 q.daemon=True
97 q.daemon=True
98 children.append(q)
98 children.append(q)
99 elif self.scheme == 'none':
99 elif self.scheme == 'none':
100 self.log.warn("task::using no Task scheduler")
100 self.log.warn("task::using no Task scheduler")
101
101
102 else:
102 else:
103 self.log.warn("task::using Python %s Task scheduler"%self.scheme)
103 self.log.info("task::using Python %s Task scheduler"%self.scheme)
104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level))
106 q.daemon=True
106 q.daemon=True
107 children.append(q)
107 children.append(q)
108
108
@@ -1,111 +1,111
1 """Dependency utilities"""
1 """Dependency utilities"""
2
2
3 from IPython.external.decorator import decorator
3 from IPython.external.decorator import decorator
4 from error import UnmetDependency
4 from error import UnmetDependency
5
5
6
6
7 # flags
7 # flags
8 ALL = 1 << 0
8 ALL = 1 << 0
9 ANY = 1 << 1
9 ANY = 1 << 1
10 HERE = 1 << 2
10 HERE = 1 << 2
11 ANYWHERE = 1 << 3
11 ANYWHERE = 1 << 3
12
12
13
13
14 class depend(object):
14 class depend(object):
15 """Dependency decorator, for use with tasks."""
15 """Dependency decorator, for use with tasks."""
16 def __init__(self, f, *args, **kwargs):
16 def __init__(self, f, *args, **kwargs):
17 self.f = f
17 self.f = f
18 self.args = args
18 self.args = args
19 self.kwargs = kwargs
19 self.kwargs = kwargs
20
20
21 def __call__(self, f):
21 def __call__(self, f):
22 return dependent(f, self.f, *self.args, **self.kwargs)
22 return dependent(f, self.f, *self.args, **self.kwargs)
23
23
24 class dependent(object):
24 class dependent(object):
25 """A function that depends on another function.
25 """A function that depends on another function.
26 This is an object to prevent the closure used
26 This is an object to prevent the closure used
27 in traditional decorators, which are not picklable.
27 in traditional decorators, which are not picklable.
28 """
28 """
29
29
30 def __init__(self, f, df, *dargs, **dkwargs):
30 def __init__(self, f, df, *dargs, **dkwargs):
31 self.f = f
31 self.f = f
32 self.func_name = getattr(f, '__name__', 'f')
32 self.func_name = getattr(f, '__name__', 'f')
33 self.df = df
33 self.df = df
34 self.dargs = dargs
34 self.dargs = dargs
35 self.dkwargs = dkwargs
35 self.dkwargs = dkwargs
36
36
37 def __call__(self, *args, **kwargs):
37 def __call__(self, *args, **kwargs):
38 if self.df(*self.dargs, **self.dkwargs) is False:
38 if self.df(*self.dargs, **self.dkwargs) is False:
39 raise UnmetDependency()
39 raise UnmetDependency()
40 return self.f(*args, **kwargs)
40 return self.f(*args, **kwargs)
41
41
42 @property
42 @property
43 def __name__(self):
43 def __name__(self):
44 return self.func_name
44 return self.func_name
45
45
46 def _require(*names):
46 def _require(*names):
47 for name in names:
47 for name in names:
48 try:
48 try:
49 __import__(name)
49 __import__(name)
50 except ImportError:
50 except ImportError:
51 return False
51 return False
52 return True
52 return True
53
53
54 def require(*names):
54 def require(*names):
55 return depend(_require, *names)
55 return depend(_require, *names)
56
56
57 class Dependency(set):
57 class Dependency(set):
58 """An object for representing a set of dependencies.
58 """An object for representing a set of msg_id dependencies.
59
59
60 Subclassed from set()."""
60 Subclassed from set()."""
61
61
62 mode='all'
62 mode='all'
63 success_only=True
63 success_only=True
64
64
65 def __init__(self, dependencies=[], mode='all', success_only=True):
65 def __init__(self, dependencies=[], mode='all', success_only=True):
66 if isinstance(dependencies, dict):
66 if isinstance(dependencies, dict):
67 # load from dict
67 # load from dict
68 mode = dependencies.get('mode', mode)
68 mode = dependencies.get('mode', mode)
69 success_only = dependencies.get('success_only', success_only)
69 success_only = dependencies.get('success_only', success_only)
70 dependencies = dependencies.get('dependencies', [])
70 dependencies = dependencies.get('dependencies', [])
71 set.__init__(self, dependencies)
71 set.__init__(self, dependencies)
72 self.mode = mode.lower()
72 self.mode = mode.lower()
73 self.success_only=success_only
73 self.success_only=success_only
74 if self.mode not in ('any', 'all'):
74 if self.mode not in ('any', 'all'):
75 raise NotImplementedError("Only any|all supported, not %r"%mode)
75 raise NotImplementedError("Only any|all supported, not %r"%mode)
76
76
77 def check(self, completed, failed=None):
77 def check(self, completed, failed=None):
78 if failed is not None and not self.success_only:
78 if failed is not None and not self.success_only:
79 completed = completed.union(failed)
79 completed = completed.union(failed)
80 if len(self) == 0:
80 if len(self) == 0:
81 return True
81 return True
82 if self.mode == 'all':
82 if self.mode == 'all':
83 return self.issubset(completed)
83 return self.issubset(completed)
84 elif self.mode == 'any':
84 elif self.mode == 'any':
85 return not self.isdisjoint(completed)
85 return not self.isdisjoint(completed)
86 else:
86 else:
87 raise NotImplementedError("Only any|all supported, not %r"%mode)
87 raise NotImplementedError("Only any|all supported, not %r"%mode)
88
88
89 def unreachable(self, failed):
89 def unreachable(self, failed):
90 if len(self) == 0 or len(failed) == 0 or not self.success_only:
90 if len(self) == 0 or len(failed) == 0 or not self.success_only:
91 return False
91 return False
92 print self, self.success_only, self.mode, failed
92 print self, self.success_only, self.mode, failed
93 if self.mode == 'all':
93 if self.mode == 'all':
94 return not self.isdisjoint(failed)
94 return not self.isdisjoint(failed)
95 elif self.mode == 'any':
95 elif self.mode == 'any':
96 return self.issubset(failed)
96 return self.issubset(failed)
97 else:
97 else:
98 raise NotImplementedError("Only any|all supported, not %r"%mode)
98 raise NotImplementedError("Only any|all supported, not %r"%mode)
99
99
100
100
101 def as_dict(self):
101 def as_dict(self):
102 """Represent this dependency as a dict. For json compatibility."""
102 """Represent this dependency as a dict. For json compatibility."""
103 return dict(
103 return dict(
104 dependencies=list(self),
104 dependencies=list(self),
105 mode=self.mode,
105 mode=self.mode,
106 success_only=self.success_only,
106 success_only=self.success_only,
107 )
107 )
108
108
109
109
110 __all__ = ['depend', 'require', 'Dependency']
110 __all__ = ['depend', 'require', 'Dependency']
111
111
@@ -1,289 +1,292
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """Classes and functions for kernel related errors and exceptions."""
3 """Classes and functions for kernel related errors and exceptions."""
4 from __future__ import print_function
4 from __future__ import print_function
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 # Tell nose to skip this module
8 # Tell nose to skip this module
9 __test__ = {}
9 __test__ = {}
10
10
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
12 # Copyright (C) 2008 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19 # Error classes
19 # Error classes
20 #-------------------------------------------------------------------------------
20 #-------------------------------------------------------------------------------
21 class IPythonError(Exception):
21 class IPythonError(Exception):
22 """Base exception that all of our exceptions inherit from.
22 """Base exception that all of our exceptions inherit from.
23
23
24 This can be raised by code that doesn't have any more specific
24 This can be raised by code that doesn't have any more specific
25 information."""
25 information."""
26
26
27 pass
27 pass
28
28
29 # Exceptions associated with the controller objects
29 # Exceptions associated with the controller objects
30 class ControllerError(IPythonError): pass
30 class ControllerError(IPythonError): pass
31
31
32 class ControllerCreationError(ControllerError): pass
32 class ControllerCreationError(ControllerError): pass
33
33
34
34
35 # Exceptions associated with the Engines
35 # Exceptions associated with the Engines
36 class EngineError(IPythonError): pass
36 class EngineError(IPythonError): pass
37
37
38 class EngineCreationError(EngineError): pass
38 class EngineCreationError(EngineError): pass
39
39
40 class KernelError(IPythonError):
40 class KernelError(IPythonError):
41 pass
41 pass
42
42
43 class NotDefined(KernelError):
43 class NotDefined(KernelError):
44 def __init__(self, name):
44 def __init__(self, name):
45 self.name = name
45 self.name = name
46 self.args = (name,)
46 self.args = (name,)
47
47
48 def __repr__(self):
48 def __repr__(self):
49 return '<NotDefined: %s>' % self.name
49 return '<NotDefined: %s>' % self.name
50
50
51 __str__ = __repr__
51 __str__ = __repr__
52
52
53
53
54 class QueueCleared(KernelError):
54 class QueueCleared(KernelError):
55 pass
55 pass
56
56
57
57
58 class IdInUse(KernelError):
58 class IdInUse(KernelError):
59 pass
59 pass
60
60
61
61
62 class ProtocolError(KernelError):
62 class ProtocolError(KernelError):
63 pass
63 pass
64
64
65
65
66 class ConnectionError(KernelError):
66 class ConnectionError(KernelError):
67 pass
67 pass
68
68
69
69
70 class InvalidEngineID(KernelError):
70 class InvalidEngineID(KernelError):
71 pass
71 pass
72
72
73
73
74 class NoEnginesRegistered(KernelError):
74 class NoEnginesRegistered(KernelError):
75 pass
75 pass
76
76
77
77
78 class InvalidClientID(KernelError):
78 class InvalidClientID(KernelError):
79 pass
79 pass
80
80
81
81
82 class InvalidDeferredID(KernelError):
82 class InvalidDeferredID(KernelError):
83 pass
83 pass
84
84
85
85
86 class SerializationError(KernelError):
86 class SerializationError(KernelError):
87 pass
87 pass
88
88
89
89
90 class MessageSizeError(KernelError):
90 class MessageSizeError(KernelError):
91 pass
91 pass
92
92
93
93
94 class PBMessageSizeError(MessageSizeError):
94 class PBMessageSizeError(MessageSizeError):
95 pass
95 pass
96
96
97
97
98 class ResultNotCompleted(KernelError):
98 class ResultNotCompleted(KernelError):
99 pass
99 pass
100
100
101
101
102 class ResultAlreadyRetrieved(KernelError):
102 class ResultAlreadyRetrieved(KernelError):
103 pass
103 pass
104
104
105 class ClientError(KernelError):
105 class ClientError(KernelError):
106 pass
106 pass
107
107
108
108
109 class TaskAborted(KernelError):
109 class TaskAborted(KernelError):
110 pass
110 pass
111
111
112
112
113 class TaskTimeout(KernelError):
113 class TaskTimeout(KernelError):
114 pass
114 pass
115
115
116
116
117 class NotAPendingResult(KernelError):
117 class NotAPendingResult(KernelError):
118 pass
118 pass
119
119
120
120
121 class UnpickleableException(KernelError):
121 class UnpickleableException(KernelError):
122 pass
122 pass
123
123
124
124
125 class AbortedPendingDeferredError(KernelError):
125 class AbortedPendingDeferredError(KernelError):
126 pass
126 pass
127
127
128
128
129 class InvalidProperty(KernelError):
129 class InvalidProperty(KernelError):
130 pass
130 pass
131
131
132
132
133 class MissingBlockArgument(KernelError):
133 class MissingBlockArgument(KernelError):
134 pass
134 pass
135
135
136
136
137 class StopLocalExecution(KernelError):
137 class StopLocalExecution(KernelError):
138 pass
138 pass
139
139
140
140
141 class SecurityError(KernelError):
141 class SecurityError(KernelError):
142 pass
142 pass
143
143
144
144
145 class FileTimeoutError(KernelError):
145 class FileTimeoutError(KernelError):
146 pass
146 pass
147
147
148 class TimeoutError(KernelError):
148 class TimeoutError(KernelError):
149 pass
149 pass
150
150
151 class UnmetDependency(KernelError):
151 class UnmetDependency(KernelError):
152 pass
152 pass
153
153
154 class ImpossibleDependency(UnmetDependency):
154 class ImpossibleDependency(UnmetDependency):
155 pass
155 pass
156
156
157 class DependencyTimeout(UnmetDependency):
158 pass
159
157 class RemoteError(KernelError):
160 class RemoteError(KernelError):
158 """Error raised elsewhere"""
161 """Error raised elsewhere"""
159 ename=None
162 ename=None
160 evalue=None
163 evalue=None
161 traceback=None
164 traceback=None
162 engine_info=None
165 engine_info=None
163
166
164 def __init__(self, ename, evalue, traceback, engine_info=None):
167 def __init__(self, ename, evalue, traceback, engine_info=None):
165 self.ename=ename
168 self.ename=ename
166 self.evalue=evalue
169 self.evalue=evalue
167 self.traceback=traceback
170 self.traceback=traceback
168 self.engine_info=engine_info or {}
171 self.engine_info=engine_info or {}
169 self.args=(ename, evalue)
172 self.args=(ename, evalue)
170
173
171 def __repr__(self):
174 def __repr__(self):
172 engineid = self.engine_info.get('engineid', ' ')
175 engineid = self.engine_info.get('engineid', ' ')
173 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
176 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
174
177
175 def __str__(self):
178 def __str__(self):
176 sig = "%s(%s)"%(self.ename, self.evalue)
179 sig = "%s(%s)"%(self.ename, self.evalue)
177 if self.traceback:
180 if self.traceback:
178 return sig + '\n' + self.traceback
181 return sig + '\n' + self.traceback
179 else:
182 else:
180 return sig
183 return sig
181
184
182
185
183 class TaskRejectError(KernelError):
186 class TaskRejectError(KernelError):
184 """Exception to raise when a task should be rejected by an engine.
187 """Exception to raise when a task should be rejected by an engine.
185
188
186 This exception can be used to allow a task running on an engine to test
189 This exception can be used to allow a task running on an engine to test
187 if the engine (or the user's namespace on the engine) has the needed
190 if the engine (or the user's namespace on the engine) has the needed
188 task dependencies. If not, the task should raise this exception. For
191 task dependencies. If not, the task should raise this exception. For
189 the task to be retried on another engine, the task should be created
192 the task to be retried on another engine, the task should be created
190 with the `retries` argument > 1.
193 with the `retries` argument > 1.
191
194
192 The advantage of this approach over our older properties system is that
195 The advantage of this approach over our older properties system is that
193 tasks have full access to the user's namespace on the engines and the
196 tasks have full access to the user's namespace on the engines and the
194 properties don't have to be managed or tested by the controller.
197 properties don't have to be managed or tested by the controller.
195 """
198 """
196
199
197
200
198 class CompositeError(KernelError):
201 class CompositeError(KernelError):
199 """Error for representing possibly multiple errors on engines"""
202 """Error for representing possibly multiple errors on engines"""
200 def __init__(self, message, elist):
203 def __init__(self, message, elist):
201 Exception.__init__(self, *(message, elist))
204 Exception.__init__(self, *(message, elist))
202 # Don't use pack_exception because it will conflict with the .message
205 # Don't use pack_exception because it will conflict with the .message
203 # attribute that is being deprecated in 2.6 and beyond.
206 # attribute that is being deprecated in 2.6 and beyond.
204 self.msg = message
207 self.msg = message
205 self.elist = elist
208 self.elist = elist
206 self.args = [ e[0] for e in elist ]
209 self.args = [ e[0] for e in elist ]
207
210
208 def _get_engine_str(self, ei):
211 def _get_engine_str(self, ei):
209 if not ei:
212 if not ei:
210 return '[Engine Exception]'
213 return '[Engine Exception]'
211 else:
214 else:
212 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
215 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
213
216
214 def _get_traceback(self, ev):
217 def _get_traceback(self, ev):
215 try:
218 try:
216 tb = ev._ipython_traceback_text
219 tb = ev._ipython_traceback_text
217 except AttributeError:
220 except AttributeError:
218 return 'No traceback available'
221 return 'No traceback available'
219 else:
222 else:
220 return tb
223 return tb
221
224
222 def __str__(self):
225 def __str__(self):
223 s = str(self.msg)
226 s = str(self.msg)
224 for en, ev, etb, ei in self.elist:
227 for en, ev, etb, ei in self.elist:
225 engine_str = self._get_engine_str(ei)
228 engine_str = self._get_engine_str(ei)
226 s = s + '\n' + engine_str + en + ': ' + str(ev)
229 s = s + '\n' + engine_str + en + ': ' + str(ev)
227 return s
230 return s
228
231
229 def __repr__(self):
232 def __repr__(self):
230 return "CompositeError(%i)"%len(self.elist)
233 return "CompositeError(%i)"%len(self.elist)
231
234
232 def print_tracebacks(self, excid=None):
235 def print_tracebacks(self, excid=None):
233 if excid is None:
236 if excid is None:
234 for (en,ev,etb,ei) in self.elist:
237 for (en,ev,etb,ei) in self.elist:
235 print (self._get_engine_str(ei))
238 print (self._get_engine_str(ei))
236 print (etb or 'No traceback available')
239 print (etb or 'No traceback available')
237 print ()
240 print ()
238 else:
241 else:
239 try:
242 try:
240 en,ev,etb,ei = self.elist[excid]
243 en,ev,etb,ei = self.elist[excid]
241 except:
244 except:
242 raise IndexError("an exception with index %i does not exist"%excid)
245 raise IndexError("an exception with index %i does not exist"%excid)
243 else:
246 else:
244 print (self._get_engine_str(ei))
247 print (self._get_engine_str(ei))
245 print (etb or 'No traceback available')
248 print (etb or 'No traceback available')
246
249
247 def raise_exception(self, excid=0):
250 def raise_exception(self, excid=0):
248 try:
251 try:
249 en,ev,etb,ei = self.elist[excid]
252 en,ev,etb,ei = self.elist[excid]
250 except:
253 except:
251 raise IndexError("an exception with index %i does not exist"%excid)
254 raise IndexError("an exception with index %i does not exist"%excid)
252 else:
255 else:
253 try:
256 try:
254 raise RemoteError(en, ev, etb, ei)
257 raise RemoteError(en, ev, etb, ei)
255 except:
258 except:
256 et,ev,tb = sys.exc_info()
259 et,ev,tb = sys.exc_info()
257
260
258
261
259 def collect_exceptions(rdict_or_list, method='unspecified'):
262 def collect_exceptions(rdict_or_list, method='unspecified'):
260 """check a result dict for errors, and raise CompositeError if any exist.
263 """check a result dict for errors, and raise CompositeError if any exist.
261 Passthrough otherwise."""
264 Passthrough otherwise."""
262 elist = []
265 elist = []
263 if isinstance(rdict_or_list, dict):
266 if isinstance(rdict_or_list, dict):
264 rlist = rdict_or_list.values()
267 rlist = rdict_or_list.values()
265 else:
268 else:
266 rlist = rdict_or_list
269 rlist = rdict_or_list
267 for r in rlist:
270 for r in rlist:
268 if isinstance(r, RemoteError):
271 if isinstance(r, RemoteError):
269 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
272 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
270 # Sometimes we could have CompositeError in our list. Just take
273 # Sometimes we could have CompositeError in our list. Just take
271 # the errors out of them and put them in our new list. This
274 # the errors out of them and put them in our new list. This
272 # has the effect of flattening lists of CompositeErrors into one
275 # has the effect of flattening lists of CompositeErrors into one
273 # CompositeError
276 # CompositeError
274 if en=='CompositeError':
277 if en=='CompositeError':
275 for e in ev.elist:
278 for e in ev.elist:
276 elist.append(e)
279 elist.append(e)
277 else:
280 else:
278 elist.append((en, ev, etb, ei))
281 elist.append((en, ev, etb, ei))
279 if len(elist)==0:
282 if len(elist)==0:
280 return rdict_or_list
283 return rdict_or_list
281 else:
284 else:
282 msg = "one or more exceptions from call to method: %s" % (method)
285 msg = "one or more exceptions from call to method: %s" % (method)
283 # This silliness is needed so the debugger has access to the exception
286 # This silliness is needed so the debugger has access to the exception
284 # instance (e in this case)
287 # instance (e in this case)
285 try:
288 try:
286 raise CompositeError(msg, elist)
289 raise CompositeError(msg, elist)
287 except CompositeError, e:
290 except CompositeError, e:
288 raise e
291 raise e
289
292
@@ -1,511 +1,526
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7
7
8 #----------------------------------------------------------------------
8 #----------------------------------------------------------------------
9 # Imports
9 # Imports
10 #----------------------------------------------------------------------
10 #----------------------------------------------------------------------
11
11
12 from __future__ import print_function
12 from __future__ import print_function
13 import sys
13 import sys
14 import logging
14 import logging
15 from random import randint,random
15 from random import randint, random
16 from types import FunctionType
16 from types import FunctionType
17
17 from datetime import datetime, timedelta
18 try:
18 try:
19 import numpy
19 import numpy
20 except ImportError:
20 except ImportError:
21 numpy = None
21 numpy = None
22
22
23 import zmq
23 import zmq
24 from zmq.eventloop import ioloop, zmqstream
24 from zmq.eventloop import ioloop, zmqstream
25
25
26 # local imports
26 # local imports
27 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
28 # from IPython.config.configurable import Configurable
28 # from IPython.config.configurable import Configurable
29 from IPython.utils.traitlets import Instance, Dict, List, Set
29 from IPython.utils.traitlets import Instance, Dict, List, Set
30
30
31 import error
31 import error
32 from client import Client
32 # from client import Client
33 from dependency import Dependency
33 from dependency import Dependency
34 import streamsession as ss
34 import streamsession as ss
35 from entry_point import connect_logger, local_logger
35 from entry_point import connect_logger, local_logger
36 from factory import LoggingFactory
36 from factory import SessionFactory
37
37
38
38
39 @decorator
39 @decorator
40 def logged(f,self,*args,**kwargs):
40 def logged(f,self,*args,**kwargs):
41 # print ("#--------------------")
41 # print ("#--------------------")
42 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
42 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
43 # print ("#--")
43 # print ("#--")
44 return f(self,*args, **kwargs)
44 return f(self,*args, **kwargs)
45
45
46 #----------------------------------------------------------------------
46 #----------------------------------------------------------------------
47 # Chooser functions
47 # Chooser functions
48 #----------------------------------------------------------------------
48 #----------------------------------------------------------------------
49
49
50 def plainrandom(loads):
50 def plainrandom(loads):
51 """Plain random pick."""
51 """Plain random pick."""
52 n = len(loads)
52 n = len(loads)
53 return randint(0,n-1)
53 return randint(0,n-1)
54
54
55 def lru(loads):
55 def lru(loads):
56 """Always pick the front of the line.
56 """Always pick the front of the line.
57
57
58 The content of `loads` is ignored.
58 The content of `loads` is ignored.
59
59
60 Assumes LRU ordering of loads, with oldest first.
60 Assumes LRU ordering of loads, with oldest first.
61 """
61 """
62 return 0
62 return 0
63
63
64 def twobin(loads):
64 def twobin(loads):
65 """Pick two at random, use the LRU of the two.
65 """Pick two at random, use the LRU of the two.
66
66
67 The content of loads is ignored.
67 The content of loads is ignored.
68
68
69 Assumes LRU ordering of loads, with oldest first.
69 Assumes LRU ordering of loads, with oldest first.
70 """
70 """
71 n = len(loads)
71 n = len(loads)
72 a = randint(0,n-1)
72 a = randint(0,n-1)
73 b = randint(0,n-1)
73 b = randint(0,n-1)
74 return min(a,b)
74 return min(a,b)
75
75
76 def weighted(loads):
76 def weighted(loads):
77 """Pick two at random using inverse load as weight.
77 """Pick two at random using inverse load as weight.
78
78
79 Return the less loaded of the two.
79 Return the less loaded of the two.
80 """
80 """
81 # weight 0 a million times more than 1:
81 # weight 0 a million times more than 1:
82 weights = 1./(1e-6+numpy.array(loads))
82 weights = 1./(1e-6+numpy.array(loads))
83 sums = weights.cumsum()
83 sums = weights.cumsum()
84 t = sums[-1]
84 t = sums[-1]
85 x = random()*t
85 x = random()*t
86 y = random()*t
86 y = random()*t
87 idx = 0
87 idx = 0
88 idy = 0
88 idy = 0
89 while sums[idx] < x:
89 while sums[idx] < x:
90 idx += 1
90 idx += 1
91 while sums[idy] < y:
91 while sums[idy] < y:
92 idy += 1
92 idy += 1
93 if weights[idy] > weights[idx]:
93 if weights[idy] > weights[idx]:
94 return idy
94 return idy
95 else:
95 else:
96 return idx
96 return idx
97
97
98 def leastload(loads):
98 def leastload(loads):
99 """Always choose the lowest load.
99 """Always choose the lowest load.
100
100
101 If the lowest load occurs more than once, the first
101 If the lowest load occurs more than once, the first
102 occurance will be used. If loads has LRU ordering, this means
102 occurance will be used. If loads has LRU ordering, this means
103 the LRU of those with the lowest load is chosen.
103 the LRU of those with the lowest load is chosen.
104 """
104 """
105 return loads.index(min(loads))
105 return loads.index(min(loads))
106
106
107 #---------------------------------------------------------------------
107 #---------------------------------------------------------------------
108 # Classes
108 # Classes
109 #---------------------------------------------------------------------
109 #---------------------------------------------------------------------
110 # store empty default dependency:
110 # store empty default dependency:
111 MET = Dependency([])
111 MET = Dependency([])
112
112
113 class TaskScheduler(LoggingFactory):
113 class TaskScheduler(SessionFactory):
114 """Python TaskScheduler object.
114 """Python TaskScheduler object.
115
115
116 This is the simplest object that supports msg_id based
116 This is the simplest object that supports msg_id based
117 DAG dependencies. *Only* task msg_ids are checked, not
117 DAG dependencies. *Only* task msg_ids are checked, not
118 msg_ids of jobs submitted via the MUX queue.
118 msg_ids of jobs submitted via the MUX queue.
119
119
120 """
120 """
121
121
122 # input arguments:
122 # input arguments:
123 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
123 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
124 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
124 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
125 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
125 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
126 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
126 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
128 io_loop = Instance(ioloop.IOLoop)
129
128
130 # internals:
129 # internals:
131 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
130 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
132 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
131 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
133 pending = Dict() # dict by engine_uuid of submitted tasks
132 pending = Dict() # dict by engine_uuid of submitted tasks
134 completed = Dict() # dict by engine_uuid of completed tasks
133 completed = Dict() # dict by engine_uuid of completed tasks
135 failed = Dict() # dict by engine_uuid of failed tasks
134 failed = Dict() # dict by engine_uuid of failed tasks
136 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
135 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
137 clients = Dict() # dict by msg_id for who submitted the task
136 clients = Dict() # dict by msg_id for who submitted the task
138 targets = List() # list of target IDENTs
137 targets = List() # list of target IDENTs
139 loads = List() # list of engine loads
138 loads = List() # list of engine loads
140 all_completed = Set() # set of all completed tasks
139 all_completed = Set() # set of all completed tasks
141 all_failed = Set() # set of all failed tasks
140 all_failed = Set() # set of all failed tasks
142 all_done = Set() # set of all finished tasks=union(completed,failed)
141 all_done = Set() # set of all finished tasks=union(completed,failed)
143 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
142 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
144 session = Instance(ss.StreamSession)
143 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
145
146
147 def __init__(self, **kwargs):
148 super(TaskScheduler, self).__init__(**kwargs)
149
144
150 self.session = ss.StreamSession(username="TaskScheduler")
151
145
146 def start(self):
152 self.engine_stream.on_recv(self.dispatch_result, copy=False)
147 self.engine_stream.on_recv(self.dispatch_result, copy=False)
153 self._notification_handlers = dict(
148 self._notification_handlers = dict(
154 registration_notification = self._register_engine,
149 registration_notification = self._register_engine,
155 unregistration_notification = self._unregister_engine
150 unregistration_notification = self._unregister_engine
156 )
151 )
157 self.notifier_stream.on_recv(self.dispatch_notification)
152 self.notifier_stream.on_recv(self.dispatch_notification)
153 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 1e3, self.loop) # 1 Hz
154 self.auditor.start()
158 self.log.info("Scheduler started...%r"%self)
155 self.log.info("Scheduler started...%r"%self)
159
156
160 def resume_receiving(self):
157 def resume_receiving(self):
161 """Resume accepting jobs."""
158 """Resume accepting jobs."""
162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
159 self.client_stream.on_recv(self.dispatch_submission, copy=False)
163
160
164 def stop_receiving(self):
161 def stop_receiving(self):
165 """Stop accepting jobs while there are no engines.
162 """Stop accepting jobs while there are no engines.
166 Leave them in the ZMQ queue."""
163 Leave them in the ZMQ queue."""
167 self.client_stream.on_recv(None)
164 self.client_stream.on_recv(None)
168
165
169 #-----------------------------------------------------------------------
166 #-----------------------------------------------------------------------
170 # [Un]Registration Handling
167 # [Un]Registration Handling
171 #-----------------------------------------------------------------------
168 #-----------------------------------------------------------------------
172
169
173 def dispatch_notification(self, msg):
170 def dispatch_notification(self, msg):
174 """dispatch register/unregister events."""
171 """dispatch register/unregister events."""
175 idents,msg = self.session.feed_identities(msg)
172 idents,msg = self.session.feed_identities(msg)
176 msg = self.session.unpack_message(msg)
173 msg = self.session.unpack_message(msg)
177 msg_type = msg['msg_type']
174 msg_type = msg['msg_type']
178 handler = self._notification_handlers.get(msg_type, None)
175 handler = self._notification_handlers.get(msg_type, None)
179 if handler is None:
176 if handler is None:
180 raise Exception("Unhandled message type: %s"%msg_type)
177 raise Exception("Unhandled message type: %s"%msg_type)
181 else:
178 else:
182 try:
179 try:
183 handler(str(msg['content']['queue']))
180 handler(str(msg['content']['queue']))
184 except KeyError:
181 except KeyError:
185 self.log.error("task::Invalid notification msg: %s"%msg)
182 self.log.error("task::Invalid notification msg: %s"%msg)
186
183
187 @logged
184 @logged
188 def _register_engine(self, uid):
185 def _register_engine(self, uid):
189 """New engine with ident `uid` became available."""
186 """New engine with ident `uid` became available."""
190 # head of the line:
187 # head of the line:
191 self.targets.insert(0,uid)
188 self.targets.insert(0,uid)
192 self.loads.insert(0,0)
189 self.loads.insert(0,0)
193 # initialize sets
190 # initialize sets
194 self.completed[uid] = set()
191 self.completed[uid] = set()
195 self.failed[uid] = set()
192 self.failed[uid] = set()
196 self.pending[uid] = {}
193 self.pending[uid] = {}
197 if len(self.targets) == 1:
194 if len(self.targets) == 1:
198 self.resume_receiving()
195 self.resume_receiving()
199
196
200 def _unregister_engine(self, uid):
197 def _unregister_engine(self, uid):
201 """Existing engine with ident `uid` became unavailable."""
198 """Existing engine with ident `uid` became unavailable."""
202 if len(self.targets) == 1:
199 if len(self.targets) == 1:
203 # this was our only engine
200 # this was our only engine
204 self.stop_receiving()
201 self.stop_receiving()
205
202
206 # handle any potentially finished tasks:
203 # handle any potentially finished tasks:
207 self.engine_stream.flush()
204 self.engine_stream.flush()
208
205
209 self.completed.pop(uid)
206 self.completed.pop(uid)
210 self.failed.pop(uid)
207 self.failed.pop(uid)
211 # don't pop destinations, because it might be used later
208 # don't pop destinations, because it might be used later
212 # map(self.destinations.pop, self.completed.pop(uid))
209 # map(self.destinations.pop, self.completed.pop(uid))
213 # map(self.destinations.pop, self.failed.pop(uid))
210 # map(self.destinations.pop, self.failed.pop(uid))
214
211
215 lost = self.pending.pop(uid)
212 lost = self.pending.pop(uid)
216
213
217 idx = self.targets.index(uid)
214 idx = self.targets.index(uid)
218 self.targets.pop(idx)
215 self.targets.pop(idx)
219 self.loads.pop(idx)
216 self.loads.pop(idx)
220
217
221 self.handle_stranded_tasks(lost)
218 self.handle_stranded_tasks(lost)
222
219
223 def handle_stranded_tasks(self, lost):
220 def handle_stranded_tasks(self, lost):
224 """Deal with jobs resident in an engine that died."""
221 """Deal with jobs resident in an engine that died."""
225 # TODO: resubmit the tasks?
222 # TODO: resubmit the tasks?
226 for msg_id in lost:
223 for msg_id in lost:
227 pass
224 pass
228
225
229
226
230 #-----------------------------------------------------------------------
227 #-----------------------------------------------------------------------
231 # Job Submission
228 # Job Submission
232 #-----------------------------------------------------------------------
229 #-----------------------------------------------------------------------
233 @logged
230 @logged
234 def dispatch_submission(self, raw_msg):
231 def dispatch_submission(self, raw_msg):
235 """Dispatch job submission to appropriate handlers."""
232 """Dispatch job submission to appropriate handlers."""
236 # ensure targets up to date:
233 # ensure targets up to date:
237 self.notifier_stream.flush()
234 self.notifier_stream.flush()
238 try:
235 try:
239 idents, msg = self.session.feed_identities(raw_msg, copy=False)
236 idents, msg = self.session.feed_identities(raw_msg, copy=False)
240 except Exception as e:
237 except Exception as e:
241 self.log.error("task::Invaid msg: %s"%msg)
238 self.log.error("task::Invaid msg: %s"%msg)
242 return
239 return
243
240
244 # send to monitor
241 # send to monitor
245 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
242 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
246
243
247 msg = self.session.unpack_message(msg, content=False, copy=False)
244 msg = self.session.unpack_message(msg, content=False, copy=False)
248 header = msg['header']
245 header = msg['header']
249 msg_id = header['msg_id']
246 msg_id = header['msg_id']
250
247
251 # time dependencies
248 # time dependencies
252 after = Dependency(header.get('after', []))
249 after = Dependency(header.get('after', []))
253 if after.mode == 'all':
250 if after.mode == 'all':
254 after.difference_update(self.all_completed)
251 after.difference_update(self.all_completed)
255 if not after.success_only:
252 if not after.success_only:
256 after.difference_update(self.all_failed)
253 after.difference_update(self.all_failed)
257 if after.check(self.all_completed, self.all_failed):
254 if after.check(self.all_completed, self.all_failed):
258 # recast as empty set, if `after` already met,
255 # recast as empty set, if `after` already met,
259 # to prevent unnecessary set comparisons
256 # to prevent unnecessary set comparisons
260 after = MET
257 after = MET
261
258
262 # location dependencies
259 # location dependencies
263 follow = Dependency(header.get('follow', []))
260 follow = Dependency(header.get('follow', []))
264
265 # check if unreachable:
261 # check if unreachable:
266 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
262 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
267 self.depending[msg_id] = [raw_msg,MET,MET]
263 self.depending[msg_id] = [raw_msg,MET,MET,None]
268 return self.fail_unreachable(msg_id)
264 return self.fail_unreachable(msg_id)
269
265
266 # turn timeouts into datetime objects:
267 timeout = header.get('timeout', None)
268 if timeout:
269 timeout = datetime.now() + timedelta(0,timeout,0)
270
270 if after.check(self.all_completed, self.all_failed):
271 if after.check(self.all_completed, self.all_failed):
271 # time deps already met, try to run
272 # time deps already met, try to run
272 if not self.maybe_run(msg_id, raw_msg, follow):
273 if not self.maybe_run(msg_id, raw_msg, follow):
273 # can't run yet
274 # can't run yet
274 self.save_unmet(msg_id, raw_msg, after, follow)
275 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
275 else:
276 else:
276 self.save_unmet(msg_id, raw_msg, after, follow)
277 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
277
278
278 @logged
279 @logged
279 def fail_unreachable(self, msg_id):
280 def audit_timeouts(self):
281 """Audit all waiting tasks for expired timeouts."""
282 now = datetime.now()
283 for msg_id in self.depending.keys():
284 # must recheck, in case one failure cascaded to another:
285 if msg_id in self.depending:
286 raw,after,follow,timeout = self.depending[msg_id]
287 if timeout and timeout < now:
288 self.fail_unreachable(msg_id, timeout=True)
289
290 @logged
291 def fail_unreachable(self, msg_id, timeout=False):
280 """a message has become unreachable"""
292 """a message has become unreachable"""
281 if msg_id not in self.depending:
293 if msg_id not in self.depending:
282 self.log.error("msg %r already failed!"%msg_id)
294 self.log.error("msg %r already failed!"%msg_id)
283 return
295 return
284 raw_msg, after, follow = self.depending.pop(msg_id)
296 raw_msg, after, follow, timeout = self.depending.pop(msg_id)
285 for mid in follow.union(after):
297 for mid in follow.union(after):
286 if mid in self.dependencies:
298 if mid in self.dependencies:
287 self.dependencies[mid].remove(msg_id)
299 self.dependencies[mid].remove(msg_id)
288
300
301 # FIXME: unpacking a message I've already unpacked, but didn't save:
289 idents,msg = self.session.feed_identities(raw_msg, copy=False)
302 idents,msg = self.session.feed_identities(raw_msg, copy=False)
290 msg = self.session.unpack_message(msg, copy=False, content=False)
303 msg = self.session.unpack_message(msg, copy=False, content=False)
291 header = msg['header']
304 header = msg['header']
292
305
306 impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency
307
293 try:
308 try:
294 raise error.ImpossibleDependency()
309 raise impossible()
295 except:
310 except:
296 content = ss.wrap_exception()
311 content = ss.wrap_exception()
297
312
298 self.all_done.add(msg_id)
313 self.all_done.add(msg_id)
299 self.all_failed.add(msg_id)
314 self.all_failed.add(msg_id)
300
315
301 msg = self.session.send(self.client_stream, 'apply_reply', content,
316 msg = self.session.send(self.client_stream, 'apply_reply', content,
302 parent=header, ident=idents)
317 parent=header, ident=idents)
303 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
318 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
304
319
305 self.update_dependencies(msg_id, success=False)
320 self.update_dependencies(msg_id, success=False)
306
321
307 @logged
322 @logged
308 def maybe_run(self, msg_id, raw_msg, follow=None):
323 def maybe_run(self, msg_id, raw_msg, follow=None):
309 """check location dependencies, and run if they are met."""
324 """check location dependencies, and run if they are met."""
310
325
311 if follow:
326 if follow:
312 def can_run(idx):
327 def can_run(idx):
313 target = self.targets[idx]
328 target = self.targets[idx]
314 return target not in self.blacklist.get(msg_id, []) and\
329 return target not in self.blacklist.get(msg_id, []) and\
315 follow.check(self.completed[target], self.failed[target])
330 follow.check(self.completed[target], self.failed[target])
316
331
317 indices = filter(can_run, range(len(self.targets)))
332 indices = filter(can_run, range(len(self.targets)))
318 if not indices:
333 if not indices:
319 # TODO evaluate unmeetable follow dependencies
334 # TODO evaluate unmeetable follow dependencies
320 if follow.mode == 'all':
335 if follow.mode == 'all':
321 dests = set()
336 dests = set()
322 relevant = self.all_completed if follow.success_only else self.all_done
337 relevant = self.all_completed if follow.success_only else self.all_done
323 for m in follow.intersection(relevant):
338 for m in follow.intersection(relevant):
324 dests.add(self.destinations[m])
339 dests.add(self.destinations[m])
325 if len(dests) > 1:
340 if len(dests) > 1:
326 self.fail_unreachable(msg_id)
341 self.fail_unreachable(msg_id)
327
342
328
343
329 return False
344 return False
330 else:
345 else:
331 indices = None
346 indices = None
332
347
333 self.submit_task(msg_id, raw_msg, indices)
348 self.submit_task(msg_id, raw_msg, indices)
334 return True
349 return True
335
350
336 @logged
351 @logged
337 def save_unmet(self, msg_id, raw_msg, after, follow):
352 def save_unmet(self, msg_id, raw_msg, after, follow, timeout):
338 """Save a message for later submission when its dependencies are met."""
353 """Save a message for later submission when its dependencies are met."""
339 self.depending[msg_id] = [raw_msg,after,follow]
354 self.depending[msg_id] = [raw_msg,after,follow,timeout]
340 # track the ids in follow or after, but not those already finished
355 # track the ids in follow or after, but not those already finished
341 for dep_id in after.union(follow).difference(self.all_done):
356 for dep_id in after.union(follow).difference(self.all_done):
342 if dep_id not in self.dependencies:
357 if dep_id not in self.dependencies:
343 self.dependencies[dep_id] = set()
358 self.dependencies[dep_id] = set()
344 self.dependencies[dep_id].add(msg_id)
359 self.dependencies[dep_id].add(msg_id)
345
360
346 @logged
361 @logged
347 def submit_task(self, msg_id, msg, follow=None, indices=None):
362 def submit_task(self, msg_id, msg, follow=None, indices=None):
348 """Submit a task to any of a subset of our targets."""
363 """Submit a task to any of a subset of our targets."""
349 if indices:
364 if indices:
350 loads = [self.loads[i] for i in indices]
365 loads = [self.loads[i] for i in indices]
351 else:
366 else:
352 loads = self.loads
367 loads = self.loads
353 idx = self.scheme(loads)
368 idx = self.scheme(loads)
354 if indices:
369 if indices:
355 idx = indices[idx]
370 idx = indices[idx]
356 target = self.targets[idx]
371 target = self.targets[idx]
357 # print (target, map(str, msg[:3]))
372 # print (target, map(str, msg[:3]))
358 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
373 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
359 self.engine_stream.send_multipart(msg, copy=False)
374 self.engine_stream.send_multipart(msg, copy=False)
360 self.add_job(idx)
375 self.add_job(idx)
361 self.pending[target][msg_id] = (msg, follow)
376 self.pending[target][msg_id] = (msg, follow)
362 content = dict(msg_id=msg_id, engine_id=target)
377 content = dict(msg_id=msg_id, engine_id=target)
363 self.session.send(self.mon_stream, 'task_destination', content=content,
378 self.session.send(self.mon_stream, 'task_destination', content=content,
364 ident=['tracktask',self.session.session])
379 ident=['tracktask',self.session.session])
365
380
366 #-----------------------------------------------------------------------
381 #-----------------------------------------------------------------------
367 # Result Handling
382 # Result Handling
368 #-----------------------------------------------------------------------
383 #-----------------------------------------------------------------------
369 @logged
384 @logged
370 def dispatch_result(self, raw_msg):
385 def dispatch_result(self, raw_msg):
371 try:
386 try:
372 idents,msg = self.session.feed_identities(raw_msg, copy=False)
387 idents,msg = self.session.feed_identities(raw_msg, copy=False)
373 except Exception as e:
388 except Exception as e:
374 self.log.error("task::Invaid result: %s"%msg)
389 self.log.error("task::Invaid result: %s"%msg)
375 return
390 return
376 msg = self.session.unpack_message(msg, content=False, copy=False)
391 msg = self.session.unpack_message(msg, content=False, copy=False)
377 header = msg['header']
392 header = msg['header']
378 if header.get('dependencies_met', True):
393 if header.get('dependencies_met', True):
379 success = (header['status'] == 'ok')
394 success = (header['status'] == 'ok')
380 self.handle_result(idents, msg['parent_header'], raw_msg, success)
395 self.handle_result(idents, msg['parent_header'], raw_msg, success)
381 # send to Hub monitor
396 # send to Hub monitor
382 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
397 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
383 else:
398 else:
384 self.handle_unmet_dependency(idents, msg['parent_header'])
399 self.handle_unmet_dependency(idents, msg['parent_header'])
385
400
386 @logged
401 @logged
387 def handle_result(self, idents, parent, raw_msg, success=True):
402 def handle_result(self, idents, parent, raw_msg, success=True):
388 # first, relay result to client
403 # first, relay result to client
389 engine = idents[0]
404 engine = idents[0]
390 client = idents[1]
405 client = idents[1]
391 # swap_ids for XREP-XREP mirror
406 # swap_ids for XREP-XREP mirror
392 raw_msg[:2] = [client,engine]
407 raw_msg[:2] = [client,engine]
393 # print (map(str, raw_msg[:4]))
408 # print (map(str, raw_msg[:4]))
394 self.client_stream.send_multipart(raw_msg, copy=False)
409 self.client_stream.send_multipart(raw_msg, copy=False)
395 # now, update our data structures
410 # now, update our data structures
396 msg_id = parent['msg_id']
411 msg_id = parent['msg_id']
397 self.pending[engine].pop(msg_id)
412 self.pending[engine].pop(msg_id)
398 if success:
413 if success:
399 self.completed[engine].add(msg_id)
414 self.completed[engine].add(msg_id)
400 self.all_completed.add(msg_id)
415 self.all_completed.add(msg_id)
401 else:
416 else:
402 self.failed[engine].add(msg_id)
417 self.failed[engine].add(msg_id)
403 self.all_failed.add(msg_id)
418 self.all_failed.add(msg_id)
404 self.all_done.add(msg_id)
419 self.all_done.add(msg_id)
405 self.destinations[msg_id] = engine
420 self.destinations[msg_id] = engine
406
421
407 self.update_dependencies(msg_id, success)
422 self.update_dependencies(msg_id, success)
408
423
409 @logged
424 @logged
410 def handle_unmet_dependency(self, idents, parent):
425 def handle_unmet_dependency(self, idents, parent):
411 engine = idents[0]
426 engine = idents[0]
412 msg_id = parent['msg_id']
427 msg_id = parent['msg_id']
413 if msg_id not in self.blacklist:
428 if msg_id not in self.blacklist:
414 self.blacklist[msg_id] = set()
429 self.blacklist[msg_id] = set()
415 self.blacklist[msg_id].add(engine)
430 self.blacklist[msg_id].add(engine)
416 raw_msg,follow = self.pending[engine].pop(msg_id)
431 raw_msg,follow,timeout = self.pending[engine].pop(msg_id)
417 if not self.maybe_run(msg_id, raw_msg, follow):
432 if not self.maybe_run(msg_id, raw_msg, follow):
418 # resubmit failed, put it back in our dependency tree
433 # resubmit failed, put it back in our dependency tree
419 self.save_unmet(msg_id, raw_msg, MET, follow)
434 self.save_unmet(msg_id, raw_msg, MET, follow, timeout)
420 pass
435 pass
421
436
422 @logged
437 @logged
423 def update_dependencies(self, dep_id, success=True):
438 def update_dependencies(self, dep_id, success=True):
424 """dep_id just finished. Update our dependency
439 """dep_id just finished. Update our dependency
425 table and submit any jobs that just became runable."""
440 table and submit any jobs that just became runable."""
426 # print ("\n\n***********")
441 # print ("\n\n***********")
427 # pprint (dep_id)
442 # pprint (dep_id)
428 # pprint (self.dependencies)
443 # pprint (self.dependencies)
429 # pprint (self.depending)
444 # pprint (self.depending)
430 # pprint (self.all_completed)
445 # pprint (self.all_completed)
431 # pprint (self.all_failed)
446 # pprint (self.all_failed)
432 # print ("\n\n***********\n\n")
447 # print ("\n\n***********\n\n")
433 if dep_id not in self.dependencies:
448 if dep_id not in self.dependencies:
434 return
449 return
435 jobs = self.dependencies.pop(dep_id)
450 jobs = self.dependencies.pop(dep_id)
436
451
437 for msg_id in jobs:
452 for msg_id in jobs:
438 raw_msg, after, follow = self.depending[msg_id]
453 raw_msg, after, follow, timeout = self.depending[msg_id]
439 # if dep_id in after:
454 # if dep_id in after:
440 # if after.mode == 'all' and (success or not after.success_only):
455 # if after.mode == 'all' and (success or not after.success_only):
441 # after.remove(dep_id)
456 # after.remove(dep_id)
442
457
443 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
458 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
444 self.fail_unreachable(msg_id)
459 self.fail_unreachable(msg_id)
445
460
446 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
461 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
447 self.depending[msg_id][1] = MET
462 self.depending[msg_id][1] = MET
448 if self.maybe_run(msg_id, raw_msg, follow):
463 if self.maybe_run(msg_id, raw_msg, follow):
449
464
450 self.depending.pop(msg_id)
465 self.depending.pop(msg_id)
451 for mid in follow.union(after):
466 for mid in follow.union(after):
452 if mid in self.dependencies:
467 if mid in self.dependencies:
453 self.dependencies[mid].remove(msg_id)
468 self.dependencies[mid].remove(msg_id)
454
469
455 #----------------------------------------------------------------------
470 #----------------------------------------------------------------------
456 # methods to be overridden by subclasses
471 # methods to be overridden by subclasses
457 #----------------------------------------------------------------------
472 #----------------------------------------------------------------------
458
473
459 def add_job(self, idx):
474 def add_job(self, idx):
460 """Called after self.targets[idx] just got the job with header.
475 """Called after self.targets[idx] just got the job with header.
461 Override with subclasses. The default ordering is simple LRU.
476 Override with subclasses. The default ordering is simple LRU.
462 The default loads are the number of outstanding jobs."""
477 The default loads are the number of outstanding jobs."""
463 self.loads[idx] += 1
478 self.loads[idx] += 1
464 for lis in (self.targets, self.loads):
479 for lis in (self.targets, self.loads):
465 lis.append(lis.pop(idx))
480 lis.append(lis.pop(idx))
466
481
467
482
468 def finish_job(self, idx):
483 def finish_job(self, idx):
469 """Called after self.targets[idx] just finished a job.
484 """Called after self.targets[idx] just finished a job.
470 Override with subclasses."""
485 Override with subclasses."""
471 self.loads[idx] -= 1
486 self.loads[idx] -= 1
472
487
473
488
474
489
475 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
490 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
476 from zmq.eventloop import ioloop
491 from zmq.eventloop import ioloop
477 from zmq.eventloop.zmqstream import ZMQStream
492 from zmq.eventloop.zmqstream import ZMQStream
478
493
479 ctx = zmq.Context()
494 ctx = zmq.Context()
480 loop = ioloop.IOLoop()
495 loop = ioloop.IOLoop()
481
496
482 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
497 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
483 ins.bind(in_addr)
498 ins.bind(in_addr)
484 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
499 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
485 outs.bind(out_addr)
500 outs.bind(out_addr)
486 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
501 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
487 mons.connect(mon_addr)
502 mons.connect(mon_addr)
488 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
503 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
489 nots.setsockopt(zmq.SUBSCRIBE, '')
504 nots.setsockopt(zmq.SUBSCRIBE, '')
490 nots.connect(not_addr)
505 nots.connect(not_addr)
491
506
492 scheme = globals().get(scheme, None)
507 scheme = globals().get(scheme, None)
493 # setup logging
508 # setup logging
494 if log_addr:
509 if log_addr:
495 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
510 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
496 else:
511 else:
497 local_logger(logname, loglevel)
512 local_logger(logname, loglevel)
498
513
499 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
514 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
500 mon_stream=mons,notifier_stream=nots,
515 mon_stream=mons, notifier_stream=nots,
501 scheme=scheme,io_loop=loop, logname=logname)
516 scheme=scheme, loop=loop, logname=logname)
502
517 scheduler.start()
503 try:
518 try:
504 loop.start()
519 loop.start()
505 except KeyboardInterrupt:
520 except KeyboardInterrupt:
506 print ("interrupted, exiting...", file=sys.__stderr__)
521 print ("interrupted, exiting...", file=sys.__stderr__)
507
522
508
523
509 if __name__ == '__main__':
524 if __name__ == '__main__':
510 iface = 'tcp://127.0.0.1:%i'
525 iface = 'tcp://127.0.0.1:%i'
511 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
526 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
General Comments 0
You need to be logged in to leave comments. Login now